1use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricTag, MetricVisibility, MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec, IntCounter};
20
21#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32 workload_class: Arc<Mutex<Option<String>>>,
34
35 history_command_count: raw::UIntGaugeVec,
37 history_dataflow_count: raw::UIntGaugeVec,
38
39 reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41 reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43 arrangement_maintenance_seconds_total: raw::CounterVec,
45 arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47 timely_step_duration_seconds: HistogramVec,
58 persist_peek_seconds: HistogramVec,
59 stashed_peek_seconds: HistogramVec,
60 handle_command_duration_seconds: HistogramVec,
61
62 index_peek_total_seconds: Histogram,
64 index_peek_seek_fulfillment_seconds: Histogram,
65 index_peek_error_scan_seconds: Histogram,
66 index_peek_cursor_setup_seconds: Histogram,
67 index_peek_row_iteration_seconds: Histogram,
68 index_peek_result_sort_seconds: Histogram,
69 index_peek_frontier_check_seconds: Histogram,
70 index_peek_row_collection_seconds: Histogram,
71
72 shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
74
75 replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
77 replica_expiration_remaining_seconds: raw::GaugeVec,
78
79 collection_count: raw::UIntGaugeVec,
81
82 subscribe_snapshots_skipped_total: IntCounter,
84}
85
86impl ComputeMetrics {
87 pub fn register_with(registry: &MetricsRegistry) -> Self {
88 let workload_class = Arc::new(Mutex::new(None));
89
90 registry.register_postprocessor({
93 let workload_class = Arc::clone(&workload_class);
94 move |metrics| {
95 let workload_class: Option<String> =
96 workload_class.lock().expect("lock poisoned").clone();
97 let Some(workload_class) = workload_class else {
98 return;
99 };
100 for metric in metrics {
101 for metric in metric.mut_metric() {
102 let mut label = LabelPair::default();
103 label.set_name("workload_class".into());
104 label.set_value(workload_class.clone());
105
106 let mut labels = metric.take_label();
107 labels.push(label);
108 metric.set_label(labels);
109 }
110 }
111 }
112 });
113
114 Self {
115 workload_class,
116 history_command_count: registry.register(metric!(
117 name: "mz_compute_replica_history_command_count",
118 help: "The number of commands in the replica's command history.",
119 var_labels: ["worker_id", "command_type"],
120 )),
121 history_dataflow_count: registry.register(metric!(
122 name: "mz_compute_replica_history_dataflow_count",
123 help: "The number of dataflows in the replica's command history.",
124 var_labels: ["worker_id"],
125 visibility: MetricVisibility::Public,
126 tags: [MetricTag::Compute],
127 )),
128 reconciliation_reused_dataflows_count_total: registry.register(metric!(
129 name: "mz_compute_reconciliation_reused_dataflows_count_total",
130 help: "The total number of dataflows that were reused during compute reconciliation.",
131 var_labels: ["worker_id"],
132 )),
133 reconciliation_replaced_dataflows_count_total: registry.register(metric!(
134 name: "mz_compute_reconciliation_replaced_dataflows_count_total",
135 help: "The total number of dataflows that were replaced during compute reconciliation.",
136 var_labels: ["worker_id", "reason"],
137 )),
138 arrangement_maintenance_seconds_total: registry.register(metric!(
139 name: "mz_arrangement_maintenance_seconds_total",
140 help: "The total time spent maintaining arrangements.",
141 var_labels: ["worker_id"],
142 visibility: MetricVisibility::Public,
143 tags: [MetricTag::Compute],
144 )),
145 arrangement_maintenance_active_info: registry.register(metric!(
146 name: "mz_arrangement_maintenance_active_info",
147 help: "Whether maintenance is currently occuring.",
148 var_labels: ["worker_id"],
149 )),
150 timely_step_duration_seconds: registry.register(metric!(
151 name: "mz_timely_step_duration_seconds",
152 help: "The time spent in each compute step_or_park call",
153 const_labels: {"cluster" => "compute"},
154 var_labels: ["worker_id"],
155 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
156 )),
157 shared_row_heap_capacity_bytes: registry.register(metric!(
158 name: "mz_dataflow_shared_row_heap_capacity_bytes",
159 help: "The heap capacity of the shared row.",
160 var_labels: ["worker_id"],
161 )),
162 persist_peek_seconds: registry.register(metric!(
163 name: "mz_persist_peek_seconds",
164 help: "Time spent in (experimental) Persist fast-path peeks.",
165 var_labels: ["worker_id"],
166 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
167 )),
168 stashed_peek_seconds: registry.register(metric!(
169 name: "mz_stashed_peek_seconds",
170 help: "Time spent reading a peek result and stashing it in the peek result stash (aka. persist blob).",
171 var_labels: ["worker_id"],
172 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
173 )),
174 handle_command_duration_seconds: registry.register(metric!(
175 name: "mz_cluster_handle_command_duration_seconds",
176 help: "Time spent in handling commands.",
177 const_labels: {"cluster" => "compute"},
178 var_labels: ["worker_id", "command_type"],
179 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
180 )),
181 index_peek_total_seconds: registry.register(metric!(
182 name: "mz_index_peek_total_seconds",
183 help: "Total time processing index peeks, from process_peek entry to response. Excluding peeks that use the peek response stash.",
184 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
185 )),
186 index_peek_seek_fulfillment_seconds: registry.register(metric!(
187 name: "mz_index_peek_seek_fulfillment_seconds",
188 help: "Time in seek_fulfillment method including frontier checks and data collection.",
189 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
190 )),
191 index_peek_error_scan_seconds: registry.register(metric!(
192 name: "mz_index_peek_error_scan_seconds",
193 help: "Time scanning the error trace for errors.",
194 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
195 )),
196 index_peek_cursor_setup_seconds: registry.register(metric!(
197 name: "mz_index_peek_cursor_setup_seconds",
198 help: "Time setting up cursor and literal constraints.",
199 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
200 )),
201 index_peek_row_iteration_seconds: registry.register(metric!(
202 name: "mz_index_peek_row_iteration_seconds",
203 help: "Time iterating rows and evaluating MFP.",
204 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
205 )),
206 index_peek_result_sort_seconds: registry.register(metric!(
207 name: "mz_index_peek_result_sort_seconds",
208 help: "Time sorting intermediate results during peek collection.",
209 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
210 )),
211 index_peek_frontier_check_seconds: registry.register(metric!(
212 name: "mz_index_peek_frontier_check_seconds",
213 help: "Time checking trace frontiers.",
214 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
215 )),
216 index_peek_row_collection_seconds: registry.register(metric!(
217 name: "mz_index_peek_row_collection_seconds",
218 help: "Time constructing RowCollection from peek results.",
219 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
220 )),
221 replica_expiration_timestamp_seconds: registry.register(metric!(
222 name: "mz_dataflow_replica_expiration_timestamp_seconds",
223 help: "The replica expiration timestamp in seconds since epoch.",
224 var_labels: ["worker_id"],
225 )),
226 replica_expiration_remaining_seconds: registry.register(metric!(
227 name: "mz_dataflow_replica_expiration_remaining_seconds",
228 help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
229 var_labels: ["worker_id"],
230 )),
231 collection_count: registry.register(metric!(
232 name: "mz_compute_collection_count",
233 help: "The number and hydration status of maintained compute collections.",
234 var_labels: ["worker_id", "type", "hydrated"],
235 )),
236 subscribe_snapshots_skipped_total: registry.register(metric!(
237 name: "mz_subscribe_snapshots_skipped_total",
238 help: "The number of collection snapshots that were skipped by the subscribe snapshot optimization.",
239 )),
240 }
241 }
242
243 pub fn set_workload_class(&self, workload_class: Option<String>) {
245 let mut guard = self.workload_class.lock().expect("lock poisoned");
246 *guard = workload_class
247 }
248
249 pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
250 let worker = worker_id.to_string();
251 let arrangement_maintenance_seconds_total = self
252 .arrangement_maintenance_seconds_total
253 .with_label_values(&[&worker]);
254 let arrangement_maintenance_active_info = self
255 .arrangement_maintenance_active_info
256 .with_label_values(&[&worker]);
257 let timely_step_duration_seconds = self
258 .timely_step_duration_seconds
259 .with_label_values(&[&worker]);
260 let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
261 let stashed_peek_seconds = self.stashed_peek_seconds.with_label_values(&[&worker]);
262 let handle_command_duration_seconds = CommandMetrics::build(|typ| {
263 self.handle_command_duration_seconds
264 .with_label_values(&[worker.as_ref(), typ])
265 });
266 let index_peek_total_seconds = self.index_peek_total_seconds.clone();
267 let index_peek_seek_fulfillment_seconds = self.index_peek_seek_fulfillment_seconds.clone();
268 let index_peek_error_scan_seconds = self.index_peek_error_scan_seconds.clone();
269 let index_peek_cursor_setup_seconds = self.index_peek_cursor_setup_seconds.clone();
270 let index_peek_row_iteration_seconds = self.index_peek_row_iteration_seconds.clone();
271 let index_peek_result_sort_seconds = self.index_peek_result_sort_seconds.clone();
272 let index_peek_frontier_check_seconds = self.index_peek_frontier_check_seconds.clone();
273 let index_peek_row_collection_seconds = self.index_peek_row_collection_seconds.clone();
274 let replica_expiration_timestamp_seconds = self
275 .replica_expiration_timestamp_seconds
276 .with_label_values(&[&worker]);
277 let replica_expiration_remaining_seconds = self
278 .replica_expiration_remaining_seconds
279 .with_label_values(&[&worker]);
280 let shared_row_heap_capacity_bytes = self
281 .shared_row_heap_capacity_bytes
282 .with_label_values(&[&worker]);
283
284 WorkerMetrics {
285 worker_label: worker,
286 metrics: self.clone(),
287 arrangement_maintenance_seconds_total,
288 arrangement_maintenance_active_info,
289 timely_step_duration_seconds,
290 persist_peek_seconds,
291 stashed_peek_seconds,
292 handle_command_duration_seconds,
293 index_peek_total_seconds,
294 index_peek_seek_fulfillment_seconds,
295 index_peek_error_scan_seconds,
296 index_peek_cursor_setup_seconds,
297 index_peek_row_iteration_seconds,
298 index_peek_result_sort_seconds,
299 index_peek_frontier_check_seconds,
300 index_peek_row_collection_seconds,
301 replica_expiration_timestamp_seconds,
302 replica_expiration_remaining_seconds,
303 shared_row_heap_capacity_bytes,
304 }
305 }
306}
307
308#[derive(Clone, Debug)]
310pub struct WorkerMetrics {
311 worker_label: String,
312 metrics: ComputeMetrics,
313
314 pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
316 pub(crate) arrangement_maintenance_active_info: UIntGauge,
322 pub(crate) timely_step_duration_seconds: Histogram,
324 pub(crate) persist_peek_seconds: Histogram,
326 pub(crate) stashed_peek_seconds: Histogram,
328 pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
330 pub(crate) index_peek_total_seconds: Histogram,
332 pub(crate) index_peek_seek_fulfillment_seconds: Histogram,
334 pub(crate) index_peek_error_scan_seconds: Histogram,
336 pub(crate) index_peek_cursor_setup_seconds: Histogram,
338 pub(crate) index_peek_row_iteration_seconds: Histogram,
340 pub(crate) index_peek_result_sort_seconds: Histogram,
342 pub(crate) index_peek_frontier_check_seconds: Histogram,
344 pub(crate) index_peek_row_collection_seconds: Histogram,
346 pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
348 pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
350 shared_row_heap_capacity_bytes: UIntGauge,
352}
353
354impl WorkerMetrics {
355 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
356 let command_counts = CommandMetrics::build(|typ| {
357 self.metrics
358 .history_command_count
359 .with_label_values(&[self.worker_label.as_ref(), typ])
360 });
361 let dataflow_count = self
362 .metrics
363 .history_dataflow_count
364 .with_label_values(&[&self.worker_label]);
365
366 HistoryMetrics {
367 command_counts,
368 dataflow_count,
369 }
370 }
371
372 pub fn record_dataflow_reconciliation(
386 &self,
387 compatible: bool,
388 uncompacted: bool,
389 subscribe_free: bool,
390 copy_to_free: bool,
391 dependencies_retained: bool,
392 ) {
393 if !compatible {
394 self.metrics
395 .reconciliation_replaced_dataflows_count_total
396 .with_label_values(&[self.worker_label.as_ref(), "incompatible"])
397 .inc();
398 } else if !uncompacted {
399 self.metrics
400 .reconciliation_replaced_dataflows_count_total
401 .with_label_values(&[self.worker_label.as_ref(), "compacted"])
402 .inc();
403 } else if !subscribe_free {
404 self.metrics
405 .reconciliation_replaced_dataflows_count_total
406 .with_label_values(&[self.worker_label.as_ref(), "subscribe"])
407 .inc();
408 } else if !copy_to_free {
409 self.metrics
410 .reconciliation_replaced_dataflows_count_total
411 .with_label_values(&[self.worker_label.as_ref(), "copy-to"])
412 .inc();
413 } else if !dependencies_retained {
414 self.metrics
415 .reconciliation_replaced_dataflows_count_total
416 .with_label_values(&[self.worker_label.as_ref(), "dependency"])
417 .inc();
418 } else {
419 self.metrics
420 .reconciliation_reused_dataflows_count_total
421 .with_label_values(&[&self.worker_label])
422 .inc();
423 }
424 }
425
426 pub fn record_shared_row_metrics(&self) {
428 let binding = SharedRow::get();
429 self.shared_row_heap_capacity_bytes
430 .set(u64::cast_from(binding.byte_capacity()));
431 }
432
433 fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
435 let hydrated = if hydrated { "1" } else { "0" };
436 self.metrics
437 .collection_count
438 .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
439 .inc();
440 }
441
442 fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
444 let hydrated = if hydrated { "1" } else { "0" };
445 self.metrics
446 .collection_count
447 .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
448 .dec();
449 }
450
451 pub fn inc_subscribe_snapshot_optimization(&self) {
452 self.metrics.subscribe_snapshots_skipped_total.inc()
453 }
454
455 pub fn set_workload_class(&self, workload_class: Option<String>) {
457 self.metrics.set_workload_class(workload_class);
458 }
459
460 pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
461 CollectionMetrics::new(id, self.clone())
462 }
463}
464
465#[derive(Clone, Debug)]
471pub struct CollectionMetrics {
472 metrics: WorkerMetrics,
473 collection_type: &'static str,
474 collection_hydrated: bool,
475}
476
477impl CollectionMetrics {
478 pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
479 let collection_type = match collection_id {
480 GlobalId::System(_) => "system",
481 GlobalId::IntrospectionSourceIndex(_) => "log",
482 GlobalId::User(_) => "user",
483 GlobalId::Transient(_) => "transient",
484 GlobalId::Explain => "explain",
485 };
486 let collection_hydrated = false;
487
488 metrics.inc_collection_count(collection_type, collection_hydrated);
489
490 Self {
491 metrics,
492 collection_type,
493 collection_hydrated,
494 }
495 }
496
497 pub fn record_collection_hydrated(&mut self) {
499 if self.collection_hydrated {
500 return;
501 }
502
503 self.metrics
504 .dec_collection_count(self.collection_type, false);
505 self.metrics
506 .inc_collection_count(self.collection_type, true);
507 self.collection_hydrated = true;
508 }
509}
510
511impl Drop for CollectionMetrics {
512 fn drop(&mut self) {
513 self.metrics
514 .dec_collection_count(self.collection_type, self.collection_hydrated);
515 }
516}