Skip to main content

mz_compute/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricTag, MetricVisibility, MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec, IntCounter};
20
21/// Metrics exposed by compute replicas.
22//
23// Most of the metrics here use the `raw` implementations, rather than the `DeleteOnDrop` wrappers
24// because their labels are fixed throughout the lifetime of the replica process. For example, any
25// metric labeled only by `worker_id` can be `raw` since the number of workers cannot change.
26//
27// Metrics that are labelled by a dimension that can change throughout the lifetime of the process
28// (such as `collection_id`) MUST NOT use the `raw` metric types and must use the `DeleteOnDrop`
29// types instead, to avoid memory leaks.
30#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32    // Optional workload class label to apply to all metrics in registry.
33    workload_class: Arc<Mutex<Option<String>>>,
34
35    // command history
36    history_command_count: raw::UIntGaugeVec,
37    history_dataflow_count: raw::UIntGaugeVec,
38
39    // reconciliation
40    reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41    reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43    // arrangements
44    arrangement_maintenance_seconds_total: raw::CounterVec,
45    arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47    // timings
48    //
49    // Note that this particular metric unfortunately takes some care to
50    // interpret. It measures the duration of step_or_park calls, which
51    // undesirably includes the parking. This is probably fine because we
52    // regularly send progress information through persist sources, which likely
53    // means the parking is capped at a second or two in practice. It also
54    // doesn't do anything to let you pinpoint _which_ operator or worker isn't
55    // yielding, but it should hopefully alert us when there is something to
56    // look at.
57    timely_step_duration_seconds: HistogramVec,
58    persist_peek_seconds: HistogramVec,
59    stashed_peek_seconds: HistogramVec,
60    handle_command_duration_seconds: HistogramVec,
61
62    // Index peek timing phases (per-cluster, no worker label)
63    index_peek_total_seconds: Histogram,
64    index_peek_seek_fulfillment_seconds: Histogram,
65    index_peek_error_scan_seconds: Histogram,
66    index_peek_cursor_setup_seconds: Histogram,
67    index_peek_row_iteration_seconds: Histogram,
68    index_peek_result_sort_seconds: Histogram,
69    index_peek_frontier_check_seconds: Histogram,
70    index_peek_row_collection_seconds: Histogram,
71
72    // memory usage
73    shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
74
75    // replica expiration
76    replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
77    replica_expiration_remaining_seconds: raw::GaugeVec,
78
79    // collections
80    collection_count: raw::UIntGaugeVec,
81
82    // subscribes
83    subscribe_snapshots_skipped_total: IntCounter,
84}
85
86impl ComputeMetrics {
87    pub fn register_with(registry: &MetricsRegistry) -> Self {
88        let workload_class = Arc::new(Mutex::new(None));
89
90        // Apply a `workload_class` label to all metrics in the registry when we
91        // have a known workload class.
92        registry.register_postprocessor({
93            let workload_class = Arc::clone(&workload_class);
94            move |metrics| {
95                let workload_class: Option<String> =
96                    workload_class.lock().expect("lock poisoned").clone();
97                let Some(workload_class) = workload_class else {
98                    return;
99                };
100                for metric in metrics {
101                    for metric in metric.mut_metric() {
102                        let mut label = LabelPair::default();
103                        label.set_name("workload_class".into());
104                        label.set_value(workload_class.clone());
105
106                        let mut labels = metric.take_label();
107                        labels.push(label);
108                        metric.set_label(labels);
109                    }
110                }
111            }
112        });
113
114        Self {
115            workload_class,
116            history_command_count: registry.register(metric!(
117                name: "mz_compute_replica_history_command_count",
118                help: "The number of commands in the replica's command history.",
119                var_labels: ["worker_id", "command_type"],
120            )),
121            history_dataflow_count: registry.register(metric!(
122                name: "mz_compute_replica_history_dataflow_count",
123                help: "The number of dataflows in the replica's command history.",
124                var_labels: ["worker_id"],
125                visibility: MetricVisibility::Public,
126                tags: [MetricTag::Compute],
127            )),
128            reconciliation_reused_dataflows_count_total: registry.register(metric!(
129                name: "mz_compute_reconciliation_reused_dataflows_count_total",
130                help: "The total number of dataflows that were reused during compute reconciliation.",
131                var_labels: ["worker_id"],
132            )),
133            reconciliation_replaced_dataflows_count_total: registry.register(metric!(
134                name: "mz_compute_reconciliation_replaced_dataflows_count_total",
135                help: "The total number of dataflows that were replaced during compute reconciliation.",
136                var_labels: ["worker_id", "reason"],
137            )),
138            arrangement_maintenance_seconds_total: registry.register(metric!(
139                name: "mz_arrangement_maintenance_seconds_total",
140                help: "The total time spent maintaining arrangements.",
141                var_labels: ["worker_id"],
142                visibility: MetricVisibility::Public,
143                tags: [MetricTag::Compute],
144            )),
145            arrangement_maintenance_active_info: registry.register(metric!(
146                name: "mz_arrangement_maintenance_active_info",
147                help: "Whether maintenance is currently occuring.",
148                var_labels: ["worker_id"],
149            )),
150            timely_step_duration_seconds: registry.register(metric!(
151                name: "mz_timely_step_duration_seconds",
152                help: "The time spent in each compute step_or_park call",
153                const_labels: {"cluster" => "compute"},
154                var_labels: ["worker_id"],
155                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
156            )),
157            shared_row_heap_capacity_bytes: registry.register(metric!(
158                name: "mz_dataflow_shared_row_heap_capacity_bytes",
159                help: "The heap capacity of the shared row.",
160                var_labels: ["worker_id"],
161            )),
162            persist_peek_seconds: registry.register(metric!(
163                name: "mz_persist_peek_seconds",
164                help: "Time spent in (experimental) Persist fast-path peeks.",
165                var_labels: ["worker_id"],
166                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
167            )),
168            stashed_peek_seconds: registry.register(metric!(
169                name: "mz_stashed_peek_seconds",
170                help: "Time spent reading a peek result and stashing it in the peek result stash (aka. persist blob).",
171                var_labels: ["worker_id"],
172                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
173            )),
174            handle_command_duration_seconds: registry.register(metric!(
175                name: "mz_cluster_handle_command_duration_seconds",
176                help: "Time spent in handling commands.",
177                const_labels: {"cluster" => "compute"},
178                var_labels: ["worker_id", "command_type"],
179                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
180            )),
181            index_peek_total_seconds: registry.register(metric!(
182                name: "mz_index_peek_total_seconds",
183                help: "Total time processing index peeks, from process_peek entry to response. Excluding peeks that use the peek response stash.",
184                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
185            )),
186            index_peek_seek_fulfillment_seconds: registry.register(metric!(
187                name: "mz_index_peek_seek_fulfillment_seconds",
188                help: "Time in seek_fulfillment method including frontier checks and data collection.",
189                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
190            )),
191            index_peek_error_scan_seconds: registry.register(metric!(
192                name: "mz_index_peek_error_scan_seconds",
193                help: "Time scanning the error trace for errors.",
194                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
195            )),
196            index_peek_cursor_setup_seconds: registry.register(metric!(
197                name: "mz_index_peek_cursor_setup_seconds",
198                help: "Time setting up cursor and literal constraints.",
199                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
200            )),
201            index_peek_row_iteration_seconds: registry.register(metric!(
202                name: "mz_index_peek_row_iteration_seconds",
203                help: "Time iterating rows and evaluating MFP.",
204                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
205            )),
206            index_peek_result_sort_seconds: registry.register(metric!(
207                name: "mz_index_peek_result_sort_seconds",
208                help: "Time sorting intermediate results during peek collection.",
209                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
210            )),
211            index_peek_frontier_check_seconds: registry.register(metric!(
212                name: "mz_index_peek_frontier_check_seconds",
213                help: "Time checking trace frontiers.",
214                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
215            )),
216            index_peek_row_collection_seconds: registry.register(metric!(
217                name: "mz_index_peek_row_collection_seconds",
218                help: "Time constructing RowCollection from peek results.",
219                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
220            )),
221            replica_expiration_timestamp_seconds: registry.register(metric!(
222                name: "mz_dataflow_replica_expiration_timestamp_seconds",
223                help: "The replica expiration timestamp in seconds since epoch.",
224                var_labels: ["worker_id"],
225            )),
226            replica_expiration_remaining_seconds: registry.register(metric!(
227                name: "mz_dataflow_replica_expiration_remaining_seconds",
228                help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
229                var_labels: ["worker_id"],
230            )),
231            collection_count: registry.register(metric!(
232                name: "mz_compute_collection_count",
233                help: "The number and hydration status of maintained compute collections.",
234                var_labels: ["worker_id", "type", "hydrated"],
235            )),
236            subscribe_snapshots_skipped_total: registry.register(metric!(
237                name: "mz_subscribe_snapshots_skipped_total",
238                help: "The number of collection snapshots that were skipped by the subscribe snapshot optimization.",
239            )),
240        }
241    }
242
243    /// Sets the workload class for the compute metrics.
244    pub fn set_workload_class(&self, workload_class: Option<String>) {
245        let mut guard = self.workload_class.lock().expect("lock poisoned");
246        *guard = workload_class
247    }
248
249    pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
250        let worker = worker_id.to_string();
251        let arrangement_maintenance_seconds_total = self
252            .arrangement_maintenance_seconds_total
253            .with_label_values(&[&worker]);
254        let arrangement_maintenance_active_info = self
255            .arrangement_maintenance_active_info
256            .with_label_values(&[&worker]);
257        let timely_step_duration_seconds = self
258            .timely_step_duration_seconds
259            .with_label_values(&[&worker]);
260        let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
261        let stashed_peek_seconds = self.stashed_peek_seconds.with_label_values(&[&worker]);
262        let handle_command_duration_seconds = CommandMetrics::build(|typ| {
263            self.handle_command_duration_seconds
264                .with_label_values(&[worker.as_ref(), typ])
265        });
266        let index_peek_total_seconds = self.index_peek_total_seconds.clone();
267        let index_peek_seek_fulfillment_seconds = self.index_peek_seek_fulfillment_seconds.clone();
268        let index_peek_error_scan_seconds = self.index_peek_error_scan_seconds.clone();
269        let index_peek_cursor_setup_seconds = self.index_peek_cursor_setup_seconds.clone();
270        let index_peek_row_iteration_seconds = self.index_peek_row_iteration_seconds.clone();
271        let index_peek_result_sort_seconds = self.index_peek_result_sort_seconds.clone();
272        let index_peek_frontier_check_seconds = self.index_peek_frontier_check_seconds.clone();
273        let index_peek_row_collection_seconds = self.index_peek_row_collection_seconds.clone();
274        let replica_expiration_timestamp_seconds = self
275            .replica_expiration_timestamp_seconds
276            .with_label_values(&[&worker]);
277        let replica_expiration_remaining_seconds = self
278            .replica_expiration_remaining_seconds
279            .with_label_values(&[&worker]);
280        let shared_row_heap_capacity_bytes = self
281            .shared_row_heap_capacity_bytes
282            .with_label_values(&[&worker]);
283
284        WorkerMetrics {
285            worker_label: worker,
286            metrics: self.clone(),
287            arrangement_maintenance_seconds_total,
288            arrangement_maintenance_active_info,
289            timely_step_duration_seconds,
290            persist_peek_seconds,
291            stashed_peek_seconds,
292            handle_command_duration_seconds,
293            index_peek_total_seconds,
294            index_peek_seek_fulfillment_seconds,
295            index_peek_error_scan_seconds,
296            index_peek_cursor_setup_seconds,
297            index_peek_row_iteration_seconds,
298            index_peek_result_sort_seconds,
299            index_peek_frontier_check_seconds,
300            index_peek_row_collection_seconds,
301            replica_expiration_timestamp_seconds,
302            replica_expiration_remaining_seconds,
303            shared_row_heap_capacity_bytes,
304        }
305    }
306}
307
308/// Per-worker metrics.
309#[derive(Clone, Debug)]
310pub struct WorkerMetrics {
311    worker_label: String,
312    metrics: ComputeMetrics,
313
314    /// The amount of time spent in arrangement maintenance.
315    pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
316    /// 1 if this worker is currently doing maintenance.
317    ///
318    /// If maintenance turns out to take a very long time, this will allow us
319    /// to gain a sense that Materialize is stuck on maintenance before the
320    /// maintenance completes
321    pub(crate) arrangement_maintenance_active_info: UIntGauge,
322    /// Histogram of Timely step timings.
323    pub(crate) timely_step_duration_seconds: Histogram,
324    /// Histogram of persist peek durations.
325    pub(crate) persist_peek_seconds: Histogram,
326    /// Histogram of stashed peek durations.
327    pub(crate) stashed_peek_seconds: Histogram,
328    /// Histogram of command handling durations.
329    pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
330    /// Histogram of total index peek durations.
331    pub(crate) index_peek_total_seconds: Histogram,
332    /// Histogram of index peek seek_fulfillment durations.
333    pub(crate) index_peek_seek_fulfillment_seconds: Histogram,
334    /// Histogram of index peek error scan durations.
335    pub(crate) index_peek_error_scan_seconds: Histogram,
336    /// Histogram of index peek cursor setup durations.
337    pub(crate) index_peek_cursor_setup_seconds: Histogram,
338    /// Histogram of index peek row iteration durations.
339    pub(crate) index_peek_row_iteration_seconds: Histogram,
340    /// Histogram of index peek result sort durations.
341    pub(crate) index_peek_result_sort_seconds: Histogram,
342    /// Histogram of index peek frontier check durations.
343    pub(crate) index_peek_frontier_check_seconds: Histogram,
344    /// Histogram of index peek row collection construction durations.
345    pub(crate) index_peek_row_collection_seconds: Histogram,
346    /// The timestamp of replica expiration.
347    pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
348    /// Remaining seconds until replica expiration.
349    pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
350    /// Heap capacity of the shared row.
351    shared_row_heap_capacity_bytes: UIntGauge,
352}
353
354impl WorkerMetrics {
355    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
356        let command_counts = CommandMetrics::build(|typ| {
357            self.metrics
358                .history_command_count
359                .with_label_values(&[self.worker_label.as_ref(), typ])
360        });
361        let dataflow_count = self
362            .metrics
363            .history_dataflow_count
364            .with_label_values(&[&self.worker_label]);
365
366        HistoryMetrics {
367            command_counts,
368            dataflow_count,
369        }
370    }
371
372    /// Record the reconciliation result for a single dataflow.
373    ///
374    /// Reconciliation is recorded as successful if the given properties all hold. Otherwise it is
375    /// recorded as unsuccessful, with a reason based on the first property that does not hold.
376    ///
377    /// The properties are:
378    ///  * compatible: The old and new dataflow descriptions are compatible.
379    ///  * uncompacted: Collections currently installed for the dataflow exports have not been
380    ///                 allowed to compact beyond that new dataflow as-of.
381    ///  * subscribe_free: The dataflow does not export a subscribe sink.
382    ///  * copy_to_free: The dataflow does not export a copy-to sink.
383    ///  * dependencies_retained: All local inputs to the dataflow were retained by compute
384    ///                           reconciliation.
385    pub fn record_dataflow_reconciliation(
386        &self,
387        compatible: bool,
388        uncompacted: bool,
389        subscribe_free: bool,
390        copy_to_free: bool,
391        dependencies_retained: bool,
392    ) {
393        if !compatible {
394            self.metrics
395                .reconciliation_replaced_dataflows_count_total
396                .with_label_values(&[self.worker_label.as_ref(), "incompatible"])
397                .inc();
398        } else if !uncompacted {
399            self.metrics
400                .reconciliation_replaced_dataflows_count_total
401                .with_label_values(&[self.worker_label.as_ref(), "compacted"])
402                .inc();
403        } else if !subscribe_free {
404            self.metrics
405                .reconciliation_replaced_dataflows_count_total
406                .with_label_values(&[self.worker_label.as_ref(), "subscribe"])
407                .inc();
408        } else if !copy_to_free {
409            self.metrics
410                .reconciliation_replaced_dataflows_count_total
411                .with_label_values(&[self.worker_label.as_ref(), "copy-to"])
412                .inc();
413        } else if !dependencies_retained {
414            self.metrics
415                .reconciliation_replaced_dataflows_count_total
416                .with_label_values(&[self.worker_label.as_ref(), "dependency"])
417                .inc();
418        } else {
419            self.metrics
420                .reconciliation_reused_dataflows_count_total
421                .with_label_values(&[&self.worker_label])
422                .inc();
423        }
424    }
425
426    /// Record the heap capacity of the shared row.
427    pub fn record_shared_row_metrics(&self) {
428        let binding = SharedRow::get();
429        self.shared_row_heap_capacity_bytes
430            .set(u64::cast_from(binding.byte_capacity()));
431    }
432
433    /// Increase the count of maintained collections.
434    fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
435        let hydrated = if hydrated { "1" } else { "0" };
436        self.metrics
437            .collection_count
438            .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
439            .inc();
440    }
441
442    /// Decrease the count of maintained collections.
443    fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
444        let hydrated = if hydrated { "1" } else { "0" };
445        self.metrics
446            .collection_count
447            .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
448            .dec();
449    }
450
451    pub fn inc_subscribe_snapshot_optimization(&self) {
452        self.metrics.subscribe_snapshots_skipped_total.inc()
453    }
454
455    /// Sets the workload class for the compute metrics.
456    pub fn set_workload_class(&self, workload_class: Option<String>) {
457        self.metrics.set_workload_class(workload_class);
458    }
459
460    pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
461        CollectionMetrics::new(id, self.clone())
462    }
463}
464
465/// Collection metrics.
466///
467/// Note that these metrics do _not_ have a `collection_id` label. We avoid introducing
468/// per-collection, per-worker metrics because the number of resulting time series would
469/// potentially be huge. Instead we count classes of collections, such as hydrated collections.
470#[derive(Clone, Debug)]
471pub struct CollectionMetrics {
472    metrics: WorkerMetrics,
473    collection_type: &'static str,
474    collection_hydrated: bool,
475}
476
477impl CollectionMetrics {
478    pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
479        let collection_type = match collection_id {
480            GlobalId::System(_) => "system",
481            GlobalId::IntrospectionSourceIndex(_) => "log",
482            GlobalId::User(_) => "user",
483            GlobalId::Transient(_) => "transient",
484            GlobalId::Explain => "explain",
485        };
486        let collection_hydrated = false;
487
488        metrics.inc_collection_count(collection_type, collection_hydrated);
489
490        Self {
491            metrics,
492            collection_type,
493            collection_hydrated,
494        }
495    }
496
497    /// Record this collection as hydration.
498    pub fn record_collection_hydrated(&mut self) {
499        if self.collection_hydrated {
500            return;
501        }
502
503        self.metrics
504            .dec_collection_count(self.collection_type, false);
505        self.metrics
506            .inc_collection_count(self.collection_type, true);
507        self.collection_hydrated = true;
508    }
509}
510
511impl Drop for CollectionMetrics {
512    fn drop(&mut self) {
513        self.metrics
514            .dec_collection_count(self.collection_type, self.collection_hydrated);
515    }
516}