Skip to main content

mz_compute/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec, IntCounter};
20
21/// Metrics exposed by compute replicas.
22//
23// Most of the metrics here use the `raw` implementations, rather than the `DeleteOnDrop` wrappers
24// because their labels are fixed throughout the lifetime of the replica process. For example, any
25// metric labeled only by `worker_id` can be `raw` since the number of workers cannot change.
26//
27// Metrics that are labelled by a dimension that can change throughout the lifetime of the process
28// (such as `collection_id`) MUST NOT use the `raw` metric types and must use the `DeleteOnDrop`
29// types instead, to avoid memory leaks.
30#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32    // Optional workload class label to apply to all metrics in registry.
33    workload_class: Arc<Mutex<Option<String>>>,
34
35    // command history
36    history_command_count: raw::UIntGaugeVec,
37    history_dataflow_count: raw::UIntGaugeVec,
38
39    // reconciliation
40    reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41    reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43    // arrangements
44    arrangement_maintenance_seconds_total: raw::CounterVec,
45    arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47    // timings
48    //
49    // Note that this particular metric unfortunately takes some care to
50    // interpret. It measures the duration of step_or_park calls, which
51    // undesirably includes the parking. This is probably fine because we
52    // regularly send progress information through persist sources, which likely
53    // means the parking is capped at a second or two in practice. It also
54    // doesn't do anything to let you pinpoint _which_ operator or worker isn't
55    // yielding, but it should hopefully alert us when there is something to
56    // look at.
57    timely_step_duration_seconds: HistogramVec,
58    persist_peek_seconds: HistogramVec,
59    stashed_peek_seconds: HistogramVec,
60    handle_command_duration_seconds: HistogramVec,
61
62    // Index peek timing phases (per-cluster, no worker label)
63    index_peek_total_seconds: Histogram,
64    index_peek_seek_fulfillment_seconds: Histogram,
65    index_peek_error_scan_seconds: Histogram,
66    index_peek_cursor_setup_seconds: Histogram,
67    index_peek_row_iteration_seconds: Histogram,
68    index_peek_result_sort_seconds: Histogram,
69    index_peek_frontier_check_seconds: Histogram,
70    index_peek_row_collection_seconds: Histogram,
71
72    // memory usage
73    shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
74
75    // replica expiration
76    replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
77    replica_expiration_remaining_seconds: raw::GaugeVec,
78
79    // collections
80    collection_count: raw::UIntGaugeVec,
81
82    // subscribes
83    subscribe_snapshots_skipped_total: IntCounter,
84}
85
86impl ComputeMetrics {
87    pub fn register_with(registry: &MetricsRegistry) -> Self {
88        let workload_class = Arc::new(Mutex::new(None));
89
90        // Apply a `workload_class` label to all metrics in the registry when we
91        // have a known workload class.
92        registry.register_postprocessor({
93            let workload_class = Arc::clone(&workload_class);
94            move |metrics| {
95                let workload_class: Option<String> =
96                    workload_class.lock().expect("lock poisoned").clone();
97                let Some(workload_class) = workload_class else {
98                    return;
99                };
100                for metric in metrics {
101                    for metric in metric.mut_metric() {
102                        let mut label = LabelPair::default();
103                        label.set_name("workload_class".into());
104                        label.set_value(workload_class.clone());
105
106                        let mut labels = metric.take_label();
107                        labels.push(label);
108                        metric.set_label(labels);
109                    }
110                }
111            }
112        });
113
114        Self {
115            workload_class,
116            history_command_count: registry.register(metric!(
117                name: "mz_compute_replica_history_command_count",
118                help: "The number of commands in the replica's command history.",
119                var_labels: ["worker_id", "command_type"],
120            )),
121            history_dataflow_count: registry.register(metric!(
122                name: "mz_compute_replica_history_dataflow_count",
123                help: "The number of dataflows in the replica's command history.",
124                var_labels: ["worker_id"],
125            )),
126            reconciliation_reused_dataflows_count_total: registry.register(metric!(
127                name: "mz_compute_reconciliation_reused_dataflows_count_total",
128                help: "The total number of dataflows that were reused during compute reconciliation.",
129                var_labels: ["worker_id"],
130            )),
131            reconciliation_replaced_dataflows_count_total: registry.register(metric!(
132                name: "mz_compute_reconciliation_replaced_dataflows_count_total",
133                help: "The total number of dataflows that were replaced during compute reconciliation.",
134                var_labels: ["worker_id", "reason"],
135            )),
136            arrangement_maintenance_seconds_total: registry.register(metric!(
137                name: "mz_arrangement_maintenance_seconds_total",
138                help: "The total time spent maintaining arrangements.",
139                var_labels: ["worker_id"],
140            )),
141            arrangement_maintenance_active_info: registry.register(metric!(
142                name: "mz_arrangement_maintenance_active_info",
143                help: "Whether maintenance is currently occuring.",
144                var_labels: ["worker_id"],
145            )),
146            timely_step_duration_seconds: registry.register(metric!(
147                name: "mz_timely_step_duration_seconds",
148                help: "The time spent in each compute step_or_park call",
149                const_labels: {"cluster" => "compute"},
150                var_labels: ["worker_id"],
151                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
152            )),
153            shared_row_heap_capacity_bytes: registry.register(metric!(
154                name: "mz_dataflow_shared_row_heap_capacity_bytes",
155                help: "The heap capacity of the shared row.",
156                var_labels: ["worker_id"],
157            )),
158            persist_peek_seconds: registry.register(metric!(
159                name: "mz_persist_peek_seconds",
160                help: "Time spent in (experimental) Persist fast-path peeks.",
161                var_labels: ["worker_id"],
162                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
163            )),
164            stashed_peek_seconds: registry.register(metric!(
165                name: "mz_stashed_peek_seconds",
166                help: "Time spent reading a peek result and stashing it in the peek result stash (aka. persist blob).",
167                var_labels: ["worker_id"],
168                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
169            )),
170            handle_command_duration_seconds: registry.register(metric!(
171                name: "mz_cluster_handle_command_duration_seconds",
172                help: "Time spent in handling commands.",
173                const_labels: {"cluster" => "compute"},
174                var_labels: ["worker_id", "command_type"],
175                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
176            )),
177            index_peek_total_seconds: registry.register(metric!(
178                name: "mz_index_peek_total_seconds",
179                help: "Total time processing index peeks, from process_peek entry to response. Excluding peeks that use the peek response stash.",
180                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
181            )),
182            index_peek_seek_fulfillment_seconds: registry.register(metric!(
183                name: "mz_index_peek_seek_fulfillment_seconds",
184                help: "Time in seek_fulfillment method including frontier checks and data collection.",
185                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
186            )),
187            index_peek_error_scan_seconds: registry.register(metric!(
188                name: "mz_index_peek_error_scan_seconds",
189                help: "Time scanning the error trace for errors.",
190                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
191            )),
192            index_peek_cursor_setup_seconds: registry.register(metric!(
193                name: "mz_index_peek_cursor_setup_seconds",
194                help: "Time setting up cursor and literal constraints.",
195                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
196            )),
197            index_peek_row_iteration_seconds: registry.register(metric!(
198                name: "mz_index_peek_row_iteration_seconds",
199                help: "Time iterating rows and evaluating MFP.",
200                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
201            )),
202            index_peek_result_sort_seconds: registry.register(metric!(
203                name: "mz_index_peek_result_sort_seconds",
204                help: "Time sorting intermediate results during peek collection.",
205                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
206            )),
207            index_peek_frontier_check_seconds: registry.register(metric!(
208                name: "mz_index_peek_frontier_check_seconds",
209                help: "Time checking trace frontiers.",
210                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
211            )),
212            index_peek_row_collection_seconds: registry.register(metric!(
213                name: "mz_index_peek_row_collection_seconds",
214                help: "Time constructing RowCollection from peek results.",
215                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
216            )),
217            replica_expiration_timestamp_seconds: registry.register(metric!(
218                name: "mz_dataflow_replica_expiration_timestamp_seconds",
219                help: "The replica expiration timestamp in seconds since epoch.",
220                var_labels: ["worker_id"],
221            )),
222            replica_expiration_remaining_seconds: registry.register(metric!(
223                name: "mz_dataflow_replica_expiration_remaining_seconds",
224                help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
225                var_labels: ["worker_id"],
226            )),
227            collection_count: registry.register(metric!(
228                name: "mz_compute_collection_count",
229                help: "The number and hydration status of maintained compute collections.",
230                var_labels: ["worker_id", "type", "hydrated"],
231            )),
232            subscribe_snapshots_skipped_total: registry.register(metric!(
233                name: "mz_subscribe_snapshots_skipped_total",
234                help: "The number of collection snapshots that were skipped by the subscribe snapshot optimization.",
235            )),
236        }
237    }
238
239    /// Sets the workload class for the compute metrics.
240    pub fn set_workload_class(&self, workload_class: Option<String>) {
241        let mut guard = self.workload_class.lock().expect("lock poisoned");
242        *guard = workload_class
243    }
244
245    pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
246        let worker = worker_id.to_string();
247        let arrangement_maintenance_seconds_total = self
248            .arrangement_maintenance_seconds_total
249            .with_label_values(&[&worker]);
250        let arrangement_maintenance_active_info = self
251            .arrangement_maintenance_active_info
252            .with_label_values(&[&worker]);
253        let timely_step_duration_seconds = self
254            .timely_step_duration_seconds
255            .with_label_values(&[&worker]);
256        let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
257        let stashed_peek_seconds = self.stashed_peek_seconds.with_label_values(&[&worker]);
258        let handle_command_duration_seconds = CommandMetrics::build(|typ| {
259            self.handle_command_duration_seconds
260                .with_label_values(&[worker.as_ref(), typ])
261        });
262        let index_peek_total_seconds = self.index_peek_total_seconds.clone();
263        let index_peek_seek_fulfillment_seconds = self.index_peek_seek_fulfillment_seconds.clone();
264        let index_peek_error_scan_seconds = self.index_peek_error_scan_seconds.clone();
265        let index_peek_cursor_setup_seconds = self.index_peek_cursor_setup_seconds.clone();
266        let index_peek_row_iteration_seconds = self.index_peek_row_iteration_seconds.clone();
267        let index_peek_result_sort_seconds = self.index_peek_result_sort_seconds.clone();
268        let index_peek_frontier_check_seconds = self.index_peek_frontier_check_seconds.clone();
269        let index_peek_row_collection_seconds = self.index_peek_row_collection_seconds.clone();
270        let replica_expiration_timestamp_seconds = self
271            .replica_expiration_timestamp_seconds
272            .with_label_values(&[&worker]);
273        let replica_expiration_remaining_seconds = self
274            .replica_expiration_remaining_seconds
275            .with_label_values(&[&worker]);
276        let shared_row_heap_capacity_bytes = self
277            .shared_row_heap_capacity_bytes
278            .with_label_values(&[&worker]);
279
280        WorkerMetrics {
281            worker_label: worker,
282            metrics: self.clone(),
283            arrangement_maintenance_seconds_total,
284            arrangement_maintenance_active_info,
285            timely_step_duration_seconds,
286            persist_peek_seconds,
287            stashed_peek_seconds,
288            handle_command_duration_seconds,
289            index_peek_total_seconds,
290            index_peek_seek_fulfillment_seconds,
291            index_peek_error_scan_seconds,
292            index_peek_cursor_setup_seconds,
293            index_peek_row_iteration_seconds,
294            index_peek_result_sort_seconds,
295            index_peek_frontier_check_seconds,
296            index_peek_row_collection_seconds,
297            replica_expiration_timestamp_seconds,
298            replica_expiration_remaining_seconds,
299            shared_row_heap_capacity_bytes,
300        }
301    }
302}
303
304/// Per-worker metrics.
305#[derive(Clone, Debug)]
306pub struct WorkerMetrics {
307    worker_label: String,
308    metrics: ComputeMetrics,
309
310    /// The amount of time spent in arrangement maintenance.
311    pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
312    /// 1 if this worker is currently doing maintenance.
313    ///
314    /// If maintenance turns out to take a very long time, this will allow us
315    /// to gain a sense that Materialize is stuck on maintenance before the
316    /// maintenance completes
317    pub(crate) arrangement_maintenance_active_info: UIntGauge,
318    /// Histogram of Timely step timings.
319    pub(crate) timely_step_duration_seconds: Histogram,
320    /// Histogram of persist peek durations.
321    pub(crate) persist_peek_seconds: Histogram,
322    /// Histogram of stashed peek durations.
323    pub(crate) stashed_peek_seconds: Histogram,
324    /// Histogram of command handling durations.
325    pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
326    /// Histogram of total index peek durations.
327    pub(crate) index_peek_total_seconds: Histogram,
328    /// Histogram of index peek seek_fulfillment durations.
329    pub(crate) index_peek_seek_fulfillment_seconds: Histogram,
330    /// Histogram of index peek error scan durations.
331    pub(crate) index_peek_error_scan_seconds: Histogram,
332    /// Histogram of index peek cursor setup durations.
333    pub(crate) index_peek_cursor_setup_seconds: Histogram,
334    /// Histogram of index peek row iteration durations.
335    pub(crate) index_peek_row_iteration_seconds: Histogram,
336    /// Histogram of index peek result sort durations.
337    pub(crate) index_peek_result_sort_seconds: Histogram,
338    /// Histogram of index peek frontier check durations.
339    pub(crate) index_peek_frontier_check_seconds: Histogram,
340    /// Histogram of index peek row collection construction durations.
341    pub(crate) index_peek_row_collection_seconds: Histogram,
342    /// The timestamp of replica expiration.
343    pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
344    /// Remaining seconds until replica expiration.
345    pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
346    /// Heap capacity of the shared row.
347    shared_row_heap_capacity_bytes: UIntGauge,
348}
349
350impl WorkerMetrics {
351    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
352        let command_counts = CommandMetrics::build(|typ| {
353            self.metrics
354                .history_command_count
355                .with_label_values(&[self.worker_label.as_ref(), typ])
356        });
357        let dataflow_count = self
358            .metrics
359            .history_dataflow_count
360            .with_label_values(&[&self.worker_label]);
361
362        HistoryMetrics {
363            command_counts,
364            dataflow_count,
365        }
366    }
367
368    /// Record the reconciliation result for a single dataflow.
369    ///
370    /// Reconciliation is recorded as successful if the given properties all hold. Otherwise it is
371    /// recorded as unsuccessful, with a reason based on the first property that does not hold.
372    ///
373    /// The properties are:
374    ///  * compatible: The old and new dataflow descriptions are compatible.
375    ///  * uncompacted: Collections currently installed for the dataflow exports have not been
376    ///                 allowed to compact beyond that new dataflow as-of.
377    ///  * subscribe_free: The dataflow does not export a subscribe sink.
378    ///  * copy_to_free: The dataflow does not export a copy-to sink.
379    ///  * dependencies_retained: All local inputs to the dataflow were retained by compute
380    ///                           reconciliation.
381    pub fn record_dataflow_reconciliation(
382        &self,
383        compatible: bool,
384        uncompacted: bool,
385        subscribe_free: bool,
386        copy_to_free: bool,
387        dependencies_retained: bool,
388    ) {
389        if !compatible {
390            self.metrics
391                .reconciliation_replaced_dataflows_count_total
392                .with_label_values(&[self.worker_label.as_ref(), "incompatible"])
393                .inc();
394        } else if !uncompacted {
395            self.metrics
396                .reconciliation_replaced_dataflows_count_total
397                .with_label_values(&[self.worker_label.as_ref(), "compacted"])
398                .inc();
399        } else if !subscribe_free {
400            self.metrics
401                .reconciliation_replaced_dataflows_count_total
402                .with_label_values(&[self.worker_label.as_ref(), "subscribe"])
403                .inc();
404        } else if !copy_to_free {
405            self.metrics
406                .reconciliation_replaced_dataflows_count_total
407                .with_label_values(&[self.worker_label.as_ref(), "copy-to"])
408                .inc();
409        } else if !dependencies_retained {
410            self.metrics
411                .reconciliation_replaced_dataflows_count_total
412                .with_label_values(&[self.worker_label.as_ref(), "dependency"])
413                .inc();
414        } else {
415            self.metrics
416                .reconciliation_reused_dataflows_count_total
417                .with_label_values(&[&self.worker_label])
418                .inc();
419        }
420    }
421
422    /// Record the heap capacity of the shared row.
423    pub fn record_shared_row_metrics(&self) {
424        let binding = SharedRow::get();
425        self.shared_row_heap_capacity_bytes
426            .set(u64::cast_from(binding.byte_capacity()));
427    }
428
429    /// Increase the count of maintained collections.
430    fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
431        let hydrated = if hydrated { "1" } else { "0" };
432        self.metrics
433            .collection_count
434            .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
435            .inc();
436    }
437
438    /// Decrease the count of maintained collections.
439    fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
440        let hydrated = if hydrated { "1" } else { "0" };
441        self.metrics
442            .collection_count
443            .with_label_values(&[self.worker_label.as_ref(), collection_type, hydrated])
444            .dec();
445    }
446
447    pub fn inc_subscribe_snapshot_optimization(&self) {
448        self.metrics.subscribe_snapshots_skipped_total.inc()
449    }
450
451    /// Sets the workload class for the compute metrics.
452    pub fn set_workload_class(&self, workload_class: Option<String>) {
453        self.metrics.set_workload_class(workload_class);
454    }
455
456    pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
457        CollectionMetrics::new(id, self.clone())
458    }
459}
460
461/// Collection metrics.
462///
463/// Note that these metrics do _not_ have a `collection_id` label. We avoid introducing
464/// per-collection, per-worker metrics because the number of resulting time series would
465/// potentially be huge. Instead we count classes of collections, such as hydrated collections.
466#[derive(Clone, Debug)]
467pub struct CollectionMetrics {
468    metrics: WorkerMetrics,
469    collection_type: &'static str,
470    collection_hydrated: bool,
471}
472
473impl CollectionMetrics {
474    pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
475        let collection_type = match collection_id {
476            GlobalId::System(_) => "system",
477            GlobalId::IntrospectionSourceIndex(_) => "log",
478            GlobalId::User(_) => "user",
479            GlobalId::Transient(_) => "transient",
480            GlobalId::Explain => "explain",
481        };
482        let collection_hydrated = false;
483
484        metrics.inc_collection_count(collection_type, collection_hydrated);
485
486        Self {
487            metrics,
488            collection_type,
489            collection_hydrated,
490        }
491    }
492
493    /// Record this collection as hydration.
494    pub fn record_collection_hydrated(&mut self) {
495        if self.collection_hydrated {
496            return;
497        }
498
499        self.metrics
500            .dec_collection_count(self.collection_type, false);
501        self.metrics
502            .inc_collection_count(self.collection_type, true);
503        self.collection_hydrated = true;
504    }
505}
506
507impl Drop for CollectionMetrics {
508    fn drop(&mut self) {
509        self.metrics
510            .dec_collection_count(self.collection_type, self.collection_hydrated);
511    }
512}