mz_compute/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec};
20
21/// Metrics exposed by compute replicas.
22//
23// Most of the metrics here use the `raw` implementations, rather than the `DeleteOnDrop` wrappers
24// because their labels are fixed throughout the lifetime of the replica process. For example, any
25// metric labeled only by `worker_id` can be `raw` since the number of workers cannot change.
26//
27// Metrics that are labelled by a dimension that can change throughout the lifetime of the process
28// (such as `collection_id`) MUST NOT use the `raw` metric types and must use the `DeleteOnDrop`
29// types instead, to avoid memory leaks.
30#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32    // Optional workload class label to apply to all metrics in registry.
33    workload_class: Arc<Mutex<Option<String>>>,
34
35    // command history
36    history_command_count: raw::UIntGaugeVec,
37    history_dataflow_count: raw::UIntGaugeVec,
38
39    // reconciliation
40    reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41    reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43    // arrangements
44    arrangement_maintenance_seconds_total: raw::CounterVec,
45    arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47    // timings
48    //
49    // Note that this particular metric unfortunately takes some care to
50    // interpret. It measures the duration of step_or_park calls, which
51    // undesirably includes the parking. This is probably fine because we
52    // regularly send progress information through persist sources, which likely
53    // means the parking is capped at a second or two in practice. It also
54    // doesn't do anything to let you pinpoint _which_ operator or worker isn't
55    // yielding, but it should hopefully alert us when there is something to
56    // look at.
57    timely_step_duration_seconds: HistogramVec,
58    persist_peek_seconds: HistogramVec,
59    stashed_peek_seconds: HistogramVec,
60    handle_command_duration_seconds: HistogramVec,
61
62    // memory usage
63    shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
64
65    // replica expiration
66    replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
67    replica_expiration_remaining_seconds: raw::GaugeVec,
68
69    // collections
70    collection_count: raw::UIntGaugeVec,
71}
72
73impl ComputeMetrics {
74    pub fn register_with(registry: &MetricsRegistry) -> Self {
75        let workload_class = Arc::new(Mutex::new(None));
76
77        // Apply a `workload_class` label to all metrics in the registry when we
78        // have a known workload class.
79        registry.register_postprocessor({
80            let workload_class = Arc::clone(&workload_class);
81            move |metrics| {
82                let workload_class: Option<String> =
83                    workload_class.lock().expect("lock poisoned").clone();
84                let Some(workload_class) = workload_class else {
85                    return;
86                };
87                for metric in metrics {
88                    for metric in metric.mut_metric() {
89                        let mut label = LabelPair::default();
90                        label.set_name("workload_class".into());
91                        label.set_value(workload_class.clone());
92
93                        let mut labels = metric.take_label();
94                        labels.push(label);
95                        metric.set_label(labels);
96                    }
97                }
98            }
99        });
100
101        Self {
102            workload_class,
103            history_command_count: registry.register(metric!(
104                name: "mz_compute_replica_history_command_count",
105                help: "The number of commands in the replica's command history.",
106                var_labels: ["worker_id", "command_type"],
107            )),
108            history_dataflow_count: registry.register(metric!(
109                name: "mz_compute_replica_history_dataflow_count",
110                help: "The number of dataflows in the replica's command history.",
111                var_labels: ["worker_id"],
112            )),
113            reconciliation_reused_dataflows_count_total: registry.register(metric!(
114                name: "mz_compute_reconciliation_reused_dataflows_count_total",
115                help: "The total number of dataflows that were reused during compute reconciliation.",
116                var_labels: ["worker_id"],
117            )),
118            reconciliation_replaced_dataflows_count_total: registry.register(metric!(
119                name: "mz_compute_reconciliation_replaced_dataflows_count_total",
120                help: "The total number of dataflows that were replaced during compute reconciliation.",
121                var_labels: ["worker_id", "reason"],
122            )),
123            arrangement_maintenance_seconds_total: registry.register(metric!(
124                name: "mz_arrangement_maintenance_seconds_total",
125                help: "The total time spent maintaining arrangements.",
126                var_labels: ["worker_id"],
127            )),
128            arrangement_maintenance_active_info: registry.register(metric!(
129                name: "mz_arrangement_maintenance_active_info",
130                help: "Whether maintenance is currently occuring.",
131                var_labels: ["worker_id"],
132            )),
133            timely_step_duration_seconds: registry.register(metric!(
134                name: "mz_timely_step_duration_seconds",
135                help: "The time spent in each compute step_or_park call",
136                const_labels: {"cluster" => "compute"},
137                var_labels: ["worker_id"],
138                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
139            )),
140            shared_row_heap_capacity_bytes: registry.register(metric!(
141                name: "mz_dataflow_shared_row_heap_capacity_bytes",
142                help: "The heap capacity of the shared row.",
143                var_labels: ["worker_id"],
144            )),
145            persist_peek_seconds: registry.register(metric!(
146                name: "mz_persist_peek_seconds",
147                help: "Time spent in (experimental) Persist fast-path peeks.",
148                var_labels: ["worker_id"],
149                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
150            )),
151            stashed_peek_seconds: registry.register(metric!(
152                name: "mz_stashed_peek_seconds",
153                help: "Time spent reading a peek result and stashing it in the peek result stash (aka. persist blob).",
154                var_labels: ["worker_id"],
155                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
156            )),
157            handle_command_duration_seconds: registry.register(metric!(
158                name: "mz_cluster_handle_command_duration_seconds",
159                help: "Time spent in handling commands.",
160                const_labels: {"cluster" => "compute"},
161                var_labels: ["worker_id", "command_type"],
162                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
163            )),
164            replica_expiration_timestamp_seconds: registry.register(metric!(
165                name: "mz_dataflow_replica_expiration_timestamp_seconds",
166                help: "The replica expiration timestamp in seconds since epoch.",
167                var_labels: ["worker_id"],
168            )),
169            replica_expiration_remaining_seconds: registry.register(metric!(
170                name: "mz_dataflow_replica_expiration_remaining_seconds",
171                help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
172                var_labels: ["worker_id"],
173            )),
174            collection_count: registry.register(metric!(
175                name: "mz_compute_collection_count",
176                help: "The number and hydration status of maintained compute collections.",
177                var_labels: ["worker_id", "type", "hydrated"],
178            )),
179        }
180    }
181
182    /// Sets the workload class for the compute metrics.
183    pub fn set_workload_class(&self, workload_class: Option<String>) {
184        let mut guard = self.workload_class.lock().expect("lock poisoned");
185        *guard = workload_class
186    }
187
188    pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
189        let worker = worker_id.to_string();
190        let arrangement_maintenance_seconds_total = self
191            .arrangement_maintenance_seconds_total
192            .with_label_values(&[&worker]);
193        let arrangement_maintenance_active_info = self
194            .arrangement_maintenance_active_info
195            .with_label_values(&[&worker]);
196        let timely_step_duration_seconds = self
197            .timely_step_duration_seconds
198            .with_label_values(&[&worker]);
199        let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
200        let stashed_peek_seconds = self.stashed_peek_seconds.with_label_values(&[&worker]);
201        let handle_command_duration_seconds = CommandMetrics::build(|typ| {
202            self.handle_command_duration_seconds
203                .with_label_values(&[&worker, typ])
204        });
205        let replica_expiration_timestamp_seconds = self
206            .replica_expiration_timestamp_seconds
207            .with_label_values(&[&worker]);
208        let replica_expiration_remaining_seconds = self
209            .replica_expiration_remaining_seconds
210            .with_label_values(&[&worker]);
211        let shared_row_heap_capacity_bytes = self
212            .shared_row_heap_capacity_bytes
213            .with_label_values(&[&worker]);
214
215        WorkerMetrics {
216            worker_label: worker,
217            metrics: self.clone(),
218            arrangement_maintenance_seconds_total,
219            arrangement_maintenance_active_info,
220            timely_step_duration_seconds,
221            persist_peek_seconds,
222            stashed_peek_seconds,
223            handle_command_duration_seconds,
224            replica_expiration_timestamp_seconds,
225            replica_expiration_remaining_seconds,
226            shared_row_heap_capacity_bytes,
227        }
228    }
229}
230
231/// Per-worker metrics.
232#[derive(Clone, Debug)]
233pub struct WorkerMetrics {
234    worker_label: String,
235    metrics: ComputeMetrics,
236
237    /// The amount of time spent in arrangement maintenance.
238    pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
239    /// 1 if this worker is currently doing maintenance.
240    ///
241    /// If maintenance turns out to take a very long time, this will allow us
242    /// to gain a sense that Materialize is stuck on maintenance before the
243    /// maintenance completes
244    pub(crate) arrangement_maintenance_active_info: UIntGauge,
245    /// Histogram of Timely step timings.
246    pub(crate) timely_step_duration_seconds: Histogram,
247    /// Histogram of persist peek durations.
248    pub(crate) persist_peek_seconds: Histogram,
249    /// Histogram of stashed peek durations.
250    pub(crate) stashed_peek_seconds: Histogram,
251    /// Histogram of command handling durations.
252    pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
253    /// The timestamp of replica expiration.
254    pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
255    /// Remaining seconds until replica expiration.
256    pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
257    /// Heap capacity of the shared row.
258    shared_row_heap_capacity_bytes: UIntGauge,
259}
260
261impl WorkerMetrics {
262    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
263        let command_counts = CommandMetrics::build(|typ| {
264            self.metrics
265                .history_command_count
266                .with_label_values(&[&self.worker_label, typ])
267        });
268        let dataflow_count = self
269            .metrics
270            .history_dataflow_count
271            .with_label_values(&[&self.worker_label]);
272
273        HistoryMetrics {
274            command_counts,
275            dataflow_count,
276        }
277    }
278
279    /// Record the reconciliation result for a single dataflow.
280    ///
281    /// Reconciliation is recorded as successful if the given properties all hold. Otherwise it is
282    /// recorded as unsuccessful, with a reason based on the first property that does not hold.
283    ///
284    /// The properties are:
285    ///  * compatible: The old and new dataflow descriptions are compatible.
286    ///  * uncompacted: Collections currently installed for the dataflow exports have not been
287    ///                 allowed to compact beyond that new dataflow as-of.
288    ///  * subscribe_free: The dataflow does not export a subscribe sink.
289    ///  * copy_to_free: The dataflow does not export a copy-to sink.
290    ///  * dependencies_retained: All local inputs to the dataflow were retained by compute
291    ///                           reconciliation.
292    pub fn record_dataflow_reconciliation(
293        &self,
294        compatible: bool,
295        uncompacted: bool,
296        subscribe_free: bool,
297        copy_to_free: bool,
298        dependencies_retained: bool,
299    ) {
300        if !compatible {
301            self.metrics
302                .reconciliation_replaced_dataflows_count_total
303                .with_label_values(&[&self.worker_label, "incompatible"])
304                .inc();
305        } else if !uncompacted {
306            self.metrics
307                .reconciliation_replaced_dataflows_count_total
308                .with_label_values(&[&self.worker_label, "compacted"])
309                .inc();
310        } else if !subscribe_free {
311            self.metrics
312                .reconciliation_replaced_dataflows_count_total
313                .with_label_values(&[&self.worker_label, "subscribe"])
314                .inc();
315        } else if !copy_to_free {
316            self.metrics
317                .reconciliation_replaced_dataflows_count_total
318                .with_label_values(&[&self.worker_label, "copy-to"])
319                .inc();
320        } else if !dependencies_retained {
321            self.metrics
322                .reconciliation_replaced_dataflows_count_total
323                .with_label_values(&[&self.worker_label, "dependency"])
324                .inc();
325        } else {
326            self.metrics
327                .reconciliation_reused_dataflows_count_total
328                .with_label_values(&[&self.worker_label])
329                .inc();
330        }
331    }
332
333    /// Record the heap capacity of the shared row.
334    pub fn record_shared_row_metrics(&self) {
335        let binding = SharedRow::get();
336        self.shared_row_heap_capacity_bytes
337            .set(u64::cast_from(binding.byte_capacity()));
338    }
339
340    /// Increase the count of maintained collections.
341    fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
342        let hydrated = if hydrated { "1" } else { "0" };
343        self.metrics
344            .collection_count
345            .with_label_values(&[&self.worker_label, collection_type, hydrated])
346            .inc();
347    }
348
349    /// Decrease the count of maintained collections.
350    fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
351        let hydrated = if hydrated { "1" } else { "0" };
352        self.metrics
353            .collection_count
354            .with_label_values(&[&self.worker_label, collection_type, hydrated])
355            .dec();
356    }
357
358    /// Sets the workload class for the compute metrics.
359    pub fn set_workload_class(&self, workload_class: Option<String>) {
360        self.metrics.set_workload_class(workload_class);
361    }
362
363    pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
364        CollectionMetrics::new(id, self.clone())
365    }
366}
367
368/// Collection metrics.
369///
370/// Note that these metrics do _not_ have a `collection_id` label. We avoid introducing
371/// per-collection, per-worker metrics because the number of resulting time series would
372/// potentially be huge. Instead we count classes of collections, such as hydrated collections.
373#[derive(Clone, Debug)]
374pub struct CollectionMetrics {
375    metrics: WorkerMetrics,
376    collection_type: &'static str,
377    collection_hydrated: bool,
378}
379
380impl CollectionMetrics {
381    pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
382        let collection_type = match collection_id {
383            GlobalId::System(_) => "system",
384            GlobalId::IntrospectionSourceIndex(_) => "log",
385            GlobalId::User(_) => "user",
386            GlobalId::Transient(_) => "transient",
387            GlobalId::Explain => "explain",
388        };
389        let collection_hydrated = false;
390
391        metrics.inc_collection_count(collection_type, collection_hydrated);
392
393        Self {
394            metrics,
395            collection_type,
396            collection_hydrated,
397        }
398    }
399
400    /// Record this collection as hydration.
401    pub fn record_collection_hydrated(&mut self) {
402        if self.collection_hydrated {
403            return;
404        }
405
406        self.metrics
407            .dec_collection_count(self.collection_type, false);
408        self.metrics
409            .inc_collection_count(self.collection_type, true);
410        self.collection_hydrated = true;
411    }
412}
413
414impl Drop for CollectionMetrics {
415    fn drop(&mut self) {
416        self.metrics
417            .dec_collection_count(self.collection_type, self.collection_hydrated);
418    }
419}