1use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec};
20
21#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32 workload_class: Arc<Mutex<Option<String>>>,
34
35 history_command_count: raw::UIntGaugeVec,
37 history_dataflow_count: raw::UIntGaugeVec,
38
39 reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41 reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43 arrangement_maintenance_seconds_total: raw::CounterVec,
45 arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47 timely_step_duration_seconds: HistogramVec,
58 persist_peek_seconds: HistogramVec,
59 handle_command_duration_seconds: HistogramVec,
60
61 shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
63
64 replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
66 replica_expiration_remaining_seconds: raw::GaugeVec,
67
68 collection_count: raw::UIntGaugeVec,
70}
71
72impl ComputeMetrics {
73 pub fn register_with(registry: &MetricsRegistry) -> Self {
74 let workload_class = Arc::new(Mutex::new(None));
75
76 registry.register_postprocessor({
79 let workload_class = Arc::clone(&workload_class);
80 move |metrics| {
81 let workload_class: Option<String> =
82 workload_class.lock().expect("lock poisoned").clone();
83 let Some(workload_class) = workload_class else {
84 return;
85 };
86 for metric in metrics {
87 for metric in metric.mut_metric() {
88 let mut label = LabelPair::default();
89 label.set_name("workload_class".into());
90 label.set_value(workload_class.clone());
91
92 let mut labels = metric.take_label();
93 labels.push(label);
94 metric.set_label(labels);
95 }
96 }
97 }
98 });
99
100 Self {
101 workload_class,
102 history_command_count: registry.register(metric!(
103 name: "mz_compute_replica_history_command_count",
104 help: "The number of commands in the replica's command history.",
105 var_labels: ["worker_id", "command_type"],
106 )),
107 history_dataflow_count: registry.register(metric!(
108 name: "mz_compute_replica_history_dataflow_count",
109 help: "The number of dataflows in the replica's command history.",
110 var_labels: ["worker_id"],
111 )),
112 reconciliation_reused_dataflows_count_total: registry.register(metric!(
113 name: "mz_compute_reconciliation_reused_dataflows_count_total",
114 help: "The total number of dataflows that were reused during compute reconciliation.",
115 var_labels: ["worker_id"],
116 )),
117 reconciliation_replaced_dataflows_count_total: registry.register(metric!(
118 name: "mz_compute_reconciliation_replaced_dataflows_count_total",
119 help: "The total number of dataflows that were replaced during compute reconciliation.",
120 var_labels: ["worker_id", "reason"],
121 )),
122 arrangement_maintenance_seconds_total: registry.register(metric!(
123 name: "mz_arrangement_maintenance_seconds_total",
124 help: "The total time spent maintaining arrangements.",
125 var_labels: ["worker_id"],
126 )),
127 arrangement_maintenance_active_info: registry.register(metric!(
128 name: "mz_arrangement_maintenance_active_info",
129 help: "Whether maintenance is currently occuring.",
130 var_labels: ["worker_id"],
131 )),
132 timely_step_duration_seconds: registry.register(metric!(
133 name: "mz_timely_step_duration_seconds",
134 help: "The time spent in each compute step_or_park call",
135 const_labels: {"cluster" => "compute"},
136 var_labels: ["worker_id"],
137 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
138 )),
139 shared_row_heap_capacity_bytes: registry.register(metric!(
140 name: "mz_dataflow_shared_row_heap_capacity_bytes",
141 help: "The heap capacity of the shared row.",
142 var_labels: ["worker_id"],
143 )),
144 persist_peek_seconds: registry.register(metric!(
145 name: "mz_persist_peek_seconds",
146 help: "Time spent in (experimental) Persist fast-path peeks.",
147 var_labels: ["worker_id"],
148 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
149 )),
150 handle_command_duration_seconds: registry.register(metric!(
151 name: "mz_cluster_handle_command_duration_seconds",
152 help: "Time spent in handling commands.",
153 const_labels: {"cluster" => "compute"},
154 var_labels: ["worker_id", "command_type"],
155 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
156 )),
157 replica_expiration_timestamp_seconds: registry.register(metric!(
158 name: "mz_dataflow_replica_expiration_timestamp_seconds",
159 help: "The replica expiration timestamp in seconds since epoch.",
160 var_labels: ["worker_id"],
161 )),
162 replica_expiration_remaining_seconds: registry.register(metric!(
163 name: "mz_dataflow_replica_expiration_remaining_seconds",
164 help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
165 var_labels: ["worker_id"],
166 )),
167 collection_count: registry.register(metric!(
168 name: "mz_compute_collection_count",
169 help: "The number and hydration status of maintained compute collections.",
170 var_labels: ["worker_id", "type", "hydrated"],
171 )),
172 }
173 }
174
175 pub fn set_workload_class(&self, workload_class: Option<String>) {
177 let mut guard = self.workload_class.lock().expect("lock poisoned");
178 *guard = workload_class
179 }
180
181 pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
182 let worker = worker_id.to_string();
183 let arrangement_maintenance_seconds_total = self
184 .arrangement_maintenance_seconds_total
185 .with_label_values(&[&worker]);
186 let arrangement_maintenance_active_info = self
187 .arrangement_maintenance_active_info
188 .with_label_values(&[&worker]);
189 let timely_step_duration_seconds = self
190 .timely_step_duration_seconds
191 .with_label_values(&[&worker]);
192 let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
193 let handle_command_duration_seconds = CommandMetrics::build(|typ| {
194 self.handle_command_duration_seconds
195 .with_label_values(&[&worker, typ])
196 });
197 let replica_expiration_timestamp_seconds = self
198 .replica_expiration_timestamp_seconds
199 .with_label_values(&[&worker]);
200 let replica_expiration_remaining_seconds = self
201 .replica_expiration_remaining_seconds
202 .with_label_values(&[&worker]);
203 let shared_row_heap_capacity_bytes = self
204 .shared_row_heap_capacity_bytes
205 .with_label_values(&[&worker]);
206
207 WorkerMetrics {
208 worker_label: worker,
209 metrics: self.clone(),
210 arrangement_maintenance_seconds_total,
211 arrangement_maintenance_active_info,
212 timely_step_duration_seconds,
213 persist_peek_seconds,
214 handle_command_duration_seconds,
215 replica_expiration_timestamp_seconds,
216 replica_expiration_remaining_seconds,
217 shared_row_heap_capacity_bytes,
218 }
219 }
220}
221
222#[derive(Clone, Debug)]
224pub struct WorkerMetrics {
225 worker_label: String,
226 metrics: ComputeMetrics,
227
228 pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
230 pub(crate) arrangement_maintenance_active_info: UIntGauge,
236 pub(crate) timely_step_duration_seconds: Histogram,
238 pub(crate) persist_peek_seconds: Histogram,
240 pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
242 pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
244 pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
246 shared_row_heap_capacity_bytes: UIntGauge,
248}
249
250impl WorkerMetrics {
251 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
252 let command_counts = CommandMetrics::build(|typ| {
253 self.metrics
254 .history_command_count
255 .with_label_values(&[&self.worker_label, typ])
256 });
257 let dataflow_count = self
258 .metrics
259 .history_dataflow_count
260 .with_label_values(&[&self.worker_label]);
261
262 HistoryMetrics {
263 command_counts,
264 dataflow_count,
265 }
266 }
267
268 pub fn record_dataflow_reconciliation(
273 &self,
274 compatible: bool,
275 uncompacted: bool,
276 subscribe_free: bool,
277 dependencies_retained: bool,
278 ) {
279 if !compatible {
280 self.metrics
281 .reconciliation_replaced_dataflows_count_total
282 .with_label_values(&[&self.worker_label, "incompatible"])
283 .inc();
284 } else if !uncompacted {
285 self.metrics
286 .reconciliation_replaced_dataflows_count_total
287 .with_label_values(&[&self.worker_label, "compacted"])
288 .inc();
289 } else if !subscribe_free {
290 self.metrics
291 .reconciliation_replaced_dataflows_count_total
292 .with_label_values(&[&self.worker_label, "subscribe"])
293 .inc();
294 } else if !dependencies_retained {
295 self.metrics
296 .reconciliation_replaced_dataflows_count_total
297 .with_label_values(&[&self.worker_label, "dependency"])
298 .inc();
299 } else {
300 self.metrics
301 .reconciliation_reused_dataflows_count_total
302 .with_label_values(&[&self.worker_label])
303 .inc();
304 }
305 }
306
307 pub fn record_shared_row_metrics(&self) {
309 let binding = SharedRow::get();
310 self.shared_row_heap_capacity_bytes
311 .set(u64::cast_from(binding.borrow().byte_capacity()));
312 }
313
314 fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
316 let hydrated = if hydrated { "1" } else { "0" };
317 self.metrics
318 .collection_count
319 .with_label_values(&[&self.worker_label, collection_type, hydrated])
320 .inc();
321 }
322
323 fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
325 let hydrated = if hydrated { "1" } else { "0" };
326 self.metrics
327 .collection_count
328 .with_label_values(&[&self.worker_label, collection_type, hydrated])
329 .dec();
330 }
331
332 pub fn set_workload_class(&self, workload_class: Option<String>) {
334 self.metrics.set_workload_class(workload_class);
335 }
336
337 pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
338 CollectionMetrics::new(id, self.clone())
339 }
340}
341
342#[derive(Clone, Debug)]
348pub struct CollectionMetrics {
349 metrics: WorkerMetrics,
350 collection_type: &'static str,
351 collection_hydrated: bool,
352}
353
354impl CollectionMetrics {
355 pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
356 let collection_type = match collection_id {
357 GlobalId::System(_) => "system",
358 GlobalId::IntrospectionSourceIndex(_) => "log",
359 GlobalId::User(_) => "user",
360 GlobalId::Transient(_) => "transient",
361 GlobalId::Explain => "explain",
362 };
363 let collection_hydrated = false;
364
365 metrics.inc_collection_count(collection_type, collection_hydrated);
366
367 Self {
368 metrics,
369 collection_type,
370 collection_hydrated,
371 }
372 }
373
374 pub fn record_collection_hydrated(&mut self) {
376 if self.collection_hydrated {
377 return;
378 }
379
380 self.metrics
381 .dec_collection_count(self.collection_type, false);
382 self.metrics
383 .inc_collection_count(self.collection_type, true);
384 self.collection_hydrated = true;
385 }
386}
387
388impl Drop for CollectionMetrics {
389 fn drop(&mut self) {
390 self.metrics
391 .dec_collection_count(self.collection_type, self.collection_hydrated);
392 }
393}