1use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec, IntCounter};
20
21#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32 workload_class: Arc<Mutex<Option<String>>>,
34
35 history_command_count: raw::UIntGaugeVec,
37 history_dataflow_count: raw::UIntGaugeVec,
38
39 reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41 reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43 arrangement_maintenance_seconds_total: raw::CounterVec,
45 arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47 timely_step_duration_seconds: HistogramVec,
58 persist_peek_seconds: HistogramVec,
59 stashed_peek_seconds: HistogramVec,
60 handle_command_duration_seconds: HistogramVec,
61
62 index_peek_total_seconds: Histogram,
64 index_peek_seek_fulfillment_seconds: Histogram,
65 index_peek_error_scan_seconds: Histogram,
66 index_peek_cursor_setup_seconds: Histogram,
67 index_peek_row_iteration_seconds: Histogram,
68 index_peek_result_sort_seconds: Histogram,
69 index_peek_frontier_check_seconds: Histogram,
70 index_peek_row_collection_seconds: Histogram,
71
72 shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
74
75 replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
77 replica_expiration_remaining_seconds: raw::GaugeVec,
78
79 collection_count: raw::UIntGaugeVec,
81
82 subscribe_snapshots_skipped_total: IntCounter,
84}
85
86impl ComputeMetrics {
87 pub fn register_with(registry: &MetricsRegistry) -> Self {
88 let workload_class = Arc::new(Mutex::new(None));
89
90 registry.register_postprocessor({
93 let workload_class = Arc::clone(&workload_class);
94 move |metrics| {
95 let workload_class: Option<String> =
96 workload_class.lock().expect("lock poisoned").clone();
97 let Some(workload_class) = workload_class else {
98 return;
99 };
100 for metric in metrics {
101 for metric in metric.mut_metric() {
102 let mut label = LabelPair::default();
103 label.set_name("workload_class".into());
104 label.set_value(workload_class.clone());
105
106 let mut labels = metric.take_label();
107 labels.push(label);
108 metric.set_label(labels);
109 }
110 }
111 }
112 });
113
114 Self {
115 workload_class,
116 history_command_count: registry.register(metric!(
117 name: "mz_compute_replica_history_command_count",
118 help: "The number of commands in the replica's command history.",
119 var_labels: ["worker_id", "command_type"],
120 )),
121 history_dataflow_count: registry.register(metric!(
122 name: "mz_compute_replica_history_dataflow_count",
123 help: "The number of dataflows in the replica's command history.",
124 var_labels: ["worker_id"],
125 )),
126 reconciliation_reused_dataflows_count_total: registry.register(metric!(
127 name: "mz_compute_reconciliation_reused_dataflows_count_total",
128 help: "The total number of dataflows that were reused during compute reconciliation.",
129 var_labels: ["worker_id"],
130 )),
131 reconciliation_replaced_dataflows_count_total: registry.register(metric!(
132 name: "mz_compute_reconciliation_replaced_dataflows_count_total",
133 help: "The total number of dataflows that were replaced during compute reconciliation.",
134 var_labels: ["worker_id", "reason"],
135 )),
136 arrangement_maintenance_seconds_total: registry.register(metric!(
137 name: "mz_arrangement_maintenance_seconds_total",
138 help: "The total time spent maintaining arrangements.",
139 var_labels: ["worker_id"],
140 )),
141 arrangement_maintenance_active_info: registry.register(metric!(
142 name: "mz_arrangement_maintenance_active_info",
143 help: "Whether maintenance is currently occuring.",
144 var_labels: ["worker_id"],
145 )),
146 timely_step_duration_seconds: registry.register(metric!(
147 name: "mz_timely_step_duration_seconds",
148 help: "The time spent in each compute step_or_park call",
149 const_labels: {"cluster" => "compute"},
150 var_labels: ["worker_id"],
151 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
152 )),
153 shared_row_heap_capacity_bytes: registry.register(metric!(
154 name: "mz_dataflow_shared_row_heap_capacity_bytes",
155 help: "The heap capacity of the shared row.",
156 var_labels: ["worker_id"],
157 )),
158 persist_peek_seconds: registry.register(metric!(
159 name: "mz_persist_peek_seconds",
160 help: "Time spent in (experimental) Persist fast-path peeks.",
161 var_labels: ["worker_id"],
162 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
163 )),
164 stashed_peek_seconds: registry.register(metric!(
165 name: "mz_stashed_peek_seconds",
166 help: "Time spent reading a peek result and stashing it in the peek result stash (aka. persist blob).",
167 var_labels: ["worker_id"],
168 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
169 )),
170 handle_command_duration_seconds: registry.register(metric!(
171 name: "mz_cluster_handle_command_duration_seconds",
172 help: "Time spent in handling commands.",
173 const_labels: {"cluster" => "compute"},
174 var_labels: ["worker_id", "command_type"],
175 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
176 )),
177 index_peek_total_seconds: registry.register(metric!(
178 name: "mz_index_peek_total_seconds",
179 help: "Total time processing index peeks, from process_peek entry to response. Excluding peeks that use the peek response stash.",
180 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
181 )),
182 index_peek_seek_fulfillment_seconds: registry.register(metric!(
183 name: "mz_index_peek_seek_fulfillment_seconds",
184 help: "Time in seek_fulfillment method including frontier checks and data collection.",
185 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
186 )),
187 index_peek_error_scan_seconds: registry.register(metric!(
188 name: "mz_index_peek_error_scan_seconds",
189 help: "Time scanning the error trace for errors.",
190 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
191 )),
192 index_peek_cursor_setup_seconds: registry.register(metric!(
193 name: "mz_index_peek_cursor_setup_seconds",
194 help: "Time setting up cursor and literal constraints.",
195 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
196 )),
197 index_peek_row_iteration_seconds: registry.register(metric!(
198 name: "mz_index_peek_row_iteration_seconds",
199 help: "Time iterating rows and evaluating MFP.",
200 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
201 )),
202 index_peek_result_sort_seconds: registry.register(metric!(
203 name: "mz_index_peek_result_sort_seconds",
204 help: "Time sorting intermediate results during peek collection.",
205 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
206 )),
207 index_peek_frontier_check_seconds: registry.register(metric!(
208 name: "mz_index_peek_frontier_check_seconds",
209 help: "Time checking trace frontiers.",
210 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
211 )),
212 index_peek_row_collection_seconds: registry.register(metric!(
213 name: "mz_index_peek_row_collection_seconds",
214 help: "Time constructing RowCollection from peek results.",
215 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
216 )),
217 replica_expiration_timestamp_seconds: registry.register(metric!(
218 name: "mz_dataflow_replica_expiration_timestamp_seconds",
219 help: "The replica expiration timestamp in seconds since epoch.",
220 var_labels: ["worker_id"],
221 )),
222 replica_expiration_remaining_seconds: registry.register(metric!(
223 name: "mz_dataflow_replica_expiration_remaining_seconds",
224 help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
225 var_labels: ["worker_id"],
226 )),
227 collection_count: registry.register(metric!(
228 name: "mz_compute_collection_count",
229 help: "The number and hydration status of maintained compute collections.",
230 var_labels: ["worker_id", "type", "hydrated"],
231 )),
232 subscribe_snapshots_skipped_total: registry.register(metric!(
233 name: "mz_subscribe_snapshots_skipped_total",
234 help: "The number of collection snapshots that were skipped by the subscribe snapshot optimization.",
235 )),
236 }
237 }
238
239 pub fn set_workload_class(&self, workload_class: Option<String>) {
241 let mut guard = self.workload_class.lock().expect("lock poisoned");
242 *guard = workload_class
243 }
244
245 pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
246 let worker = worker_id.to_string();
247 let arrangement_maintenance_seconds_total = self
248 .arrangement_maintenance_seconds_total
249 .with_label_values(&[&worker]);
250 let arrangement_maintenance_active_info = self
251 .arrangement_maintenance_active_info
252 .with_label_values(&[&worker]);
253 let timely_step_duration_seconds = self
254 .timely_step_duration_seconds
255 .with_label_values(&[&worker]);
256 let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
257 let stashed_peek_seconds = self.stashed_peek_seconds.with_label_values(&[&worker]);
258 let handle_command_duration_seconds = CommandMetrics::build(|typ| {
259 self.handle_command_duration_seconds
260 .with_label_values(&[worker.as_ref(), typ])
261 });
262 let index_peek_total_seconds = self.index_peek_total_seconds.clone();
263 let index_peek_seek_fulfillment_seconds = self.index_peek_seek_fulfillment_seconds.clone();
264 let index_peek_error_scan_seconds = self.index_peek_error_scan_seconds.clone();
265 let index_peek_cursor_setup_seconds = self.index_peek_cursor_setup_seconds.clone();
266 let index_peek_row_iteration_seconds = self.index_peek_row_iteration_seconds.clone();
267 let index_peek_result_sort_seconds = self.index_peek_result_sort_seconds.clone();
268 let index_peek_frontier_check_seconds = self.index_peek_frontier_check_seconds.clone();
269 let index_peek_row_collection_seconds = self.index_peek_row_collection_seconds.clone();
270 let replica_expiration_timestamp_seconds = self
271 .replica_expiration_timestamp_seconds
272 .with_label_values(&[&worker]);
273 let replica_expiration_remaining_seconds = self
274 .replica_expiration_remaining_seconds
275 .with_label_values(&[&worker]);
276 let shared_row_heap_capacity_bytes = self
277 .shared_row_heap_capacity_bytes
278 .with_label_values(&[&worker]);
279
280 WorkerMetrics {
281 worker_label: worker,
282 metrics: self.clone(),
283 arrangement_maintenance_seconds_total,
284 arrangement_maintenance_active_info,
285 timely_step_duration_seconds,
286 persist_peek_seconds,
287 stashed_peek_seconds,
288 handle_command_duration_seconds,
289 index_peek_total_seconds,
290 index_peek_seek_fulfillment_seconds,
291 index_peek_error_scan_seconds,
292 index_peek_cursor_setup_seconds,
293 index_peek_row_iteration_seconds,
294 index_peek_result_sort_seconds,
295 index_peek_frontier_check_seconds,
296 index_peek_row_collection_seconds,
297 replica_expiration_timestamp_seconds,
298 replica_expiration_remaining_seconds,
299 shared_row_heap_capacity_bytes,
300 }
301 }
302}
303
304#[derive(Clone, Debug)]
306pub struct WorkerMetrics {
307 worker_label: String,
308 metrics: ComputeMetrics,
309
310 pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
312 pub(crate) arrangement_maintenance_active_info: UIntGauge,
318 pub(crate) timely_step_duration_seconds: Histogram,
320 pub(crate) persist_peek_seconds: Histogram,
322 pub(crate) stashed_peek_seconds: Histogram,
324 pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
326 pub(crate) index_peek_total_seconds: Histogram,
328 pub(crate) index_peek_seek_fulfillment_seconds: Histogram,
330 pub(crate) index_peek_error_scan_seconds: Histogram,
332 pub(crate) index_peek_cursor_setup_seconds: Histogram,
334 pub(crate) index_peek_row_iteration_seconds: Histogram,
336 pub(crate) index_peek_result_sort_seconds: Histogram,
338 pub(crate) index_peek_frontier_check_seconds: Histogram,
340 pub(crate) index_peek_row_collection_seconds: Histogram,
342 pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
344 pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
346 shared_row_heap_capacity_bytes: UIntGauge,
348}
349
350impl WorkerMetrics {
351 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
352 let command_counts = CommandMetrics::build(|typ| {
353 self.metrics
354 .history_command_count
355 .with_label_values(&[self.worker_label.as_ref(), typ])
356 });
357 let dataflow_count = self
358 .metrics
359 .history_dataflow_count
360 .with_label_values(&[&self.worker_label]);
361
362 HistoryMetrics {
363 command_counts,
364 dataflow_count,
365 }
366 }
367
368 pub fn record_dataflow_reconciliation(
382 &self,
383 compatible: bool,
384 uncompacted: bool,
385 subscribe_free: bool,
386 copy_to_free: bool,
387 dependencies_retained: bool,
388 ) {
389 if !compatible {
390 self.metrics
391 .reconciliation_replaced_dataflows_count_total
392 .with_label_values(&[self.worker_label.as_ref(), "incompatible"])
393 .inc();
394 } else if !uncompacted {
395 self.metrics
396 .reconciliation_replaced_dataflows_count_total
397 .with_label_values(&[self.worker_label.as_ref(), "compacted"])
398 .inc();
399 } else if !subscribe_free {
400 self.metrics
401 .reconciliation_replaced_dataflows_count_total
402 .with_label_values(&[self.worker_label.as_ref(), "subscribe"])
403 .inc();
404 } else if !copy_to_free {
405 self.metrics
406 .reconciliation_replaced_dataflows_count_total
407 .with_label_values(&[self.worker_label.as_ref(), "copy-to"])
408 .inc();
409 } else if !dependencies_retained {
410 self.metrics
411 .reconciliation_replaced_dataflows_count_total
412 .with_label_values(&[self.worker_label.as_ref(), "dependency"])
413 .inc();
414 } else {
415 self.metrics
416 .reconciliation_reused_dataflows_count_total
417 .with_label_values(&[&self.worker_label])
418 .inc();
419 }
420 }
421
422 pub fn record_shared_row_metrics(&self) {
424 let binding = SharedRow::get();
425 self.shared_row_heap_capacity_bytes
426 .set(u64::cast_from(binding.byte_capacity()));
427 }
428
429 fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
431 let hydrated = if hydrated { "1" } else { "0" };
432 self.metrics
433 .collection_count
434 .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
435 .inc();
436 }
437
438 fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
440 let hydrated = if hydrated { "1" } else { "0" };
441 self.metrics
442 .collection_count
443 .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
444 .dec();
445 }
446
447 pub fn inc_subscribe_snapshot_optimization(&self) {
448 self.metrics.subscribe_snapshots_skipped_total.inc()
449 }
450
451 pub fn set_workload_class(&self, workload_class: Option<String>) {
453 self.metrics.set_workload_class(workload_class);
454 }
455
456 pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
457 CollectionMetrics::new(id, self.clone())
458 }
459}
460
461#[derive(Clone, Debug)]
467pub struct CollectionMetrics {
468 metrics: WorkerMetrics,
469 collection_type: &'static str,
470 collection_hydrated: bool,
471}
472
473impl CollectionMetrics {
474 pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
475 let collection_type = match collection_id {
476 GlobalId::System(_) => "system",
477 GlobalId::IntrospectionSourceIndex(_) => "log",
478 GlobalId::User(_) => "user",
479 GlobalId::Transient(_) => "transient",
480 GlobalId::Explain => "explain",
481 };
482 let collection_hydrated = false;
483
484 metrics.inc_collection_count(collection_type, collection_hydrated);
485
486 Self {
487 metrics,
488 collection_type,
489 collection_hydrated,
490 }
491 }
492
493 pub fn record_collection_hydrated(&mut self) {
495 if self.collection_hydrated {
496 return;
497 }
498
499 self.metrics
500 .dec_collection_count(self.collection_type, false);
501 self.metrics
502 .inc_collection_count(self.collection_type, true);
503 self.collection_hydrated = true;
504 }
505}
506
507impl Drop for CollectionMetrics {
508 fn drop(&mut self) {
509 self.metrics
510 .dec_collection_count(self.collection_type, self.collection_hydrated);
511 }
512}