mz_compute/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec};
20
21/// Metrics exposed by compute replicas.
22//
23// Most of the metrics here use the `raw` implementations, rather than the `DeleteOnDrop` wrappers
24// because their labels are fixed throughout the lifetime of the replica process. For example, any
25// metric labeled only by `worker_id` can be `raw` since the number of workers cannot change.
26//
27// Metrics that are labelled by a dimension that can change throughout the lifetime of the process
28// (such as `collection_id`) MUST NOT use the `raw` metric types and must use the `DeleteOnDrop`
29// types instead, to avoid memory leaks.
30#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32    // Optional workload class label to apply to all metrics in registry.
33    workload_class: Arc<Mutex<Option<String>>>,
34
35    // command history
36    history_command_count: raw::UIntGaugeVec,
37    history_dataflow_count: raw::UIntGaugeVec,
38
39    // reconciliation
40    reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41    reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43    // arrangements
44    arrangement_maintenance_seconds_total: raw::CounterVec,
45    arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47    // timings
48    //
49    // Note that this particular metric unfortunately takes some care to
50    // interpret. It measures the duration of step_or_park calls, which
51    // undesirably includes the parking. This is probably fine because we
52    // regularly send progress information through persist sources, which likely
53    // means the parking is capped at a second or two in practice. It also
54    // doesn't do anything to let you pinpoint _which_ operator or worker isn't
55    // yielding, but it should hopefully alert us when there is something to
56    // look at.
57    timely_step_duration_seconds: HistogramVec,
58    persist_peek_seconds: HistogramVec,
59    handle_command_duration_seconds: HistogramVec,
60
61    // memory usage
62    shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
63
64    // replica expiration
65    replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
66    replica_expiration_remaining_seconds: raw::GaugeVec,
67
68    // collections
69    collection_count: raw::UIntGaugeVec,
70}
71
72impl ComputeMetrics {
73    pub fn register_with(registry: &MetricsRegistry) -> Self {
74        let workload_class = Arc::new(Mutex::new(None));
75
76        // Apply a `workload_class` label to all metrics in the registry when we
77        // have a known workload class.
78        registry.register_postprocessor({
79            let workload_class = Arc::clone(&workload_class);
80            move |metrics| {
81                let workload_class: Option<String> =
82                    workload_class.lock().expect("lock poisoned").clone();
83                let Some(workload_class) = workload_class else {
84                    return;
85                };
86                for metric in metrics {
87                    for metric in metric.mut_metric() {
88                        let mut label = LabelPair::default();
89                        label.set_name("workload_class".into());
90                        label.set_value(workload_class.clone());
91
92                        let mut labels = metric.take_label();
93                        labels.push(label);
94                        metric.set_label(labels);
95                    }
96                }
97            }
98        });
99
100        Self {
101            workload_class,
102            history_command_count: registry.register(metric!(
103                name: "mz_compute_replica_history_command_count",
104                help: "The number of commands in the replica's command history.",
105                var_labels: ["worker_id", "command_type"],
106            )),
107            history_dataflow_count: registry.register(metric!(
108                name: "mz_compute_replica_history_dataflow_count",
109                help: "The number of dataflows in the replica's command history.",
110                var_labels: ["worker_id"],
111            )),
112            reconciliation_reused_dataflows_count_total: registry.register(metric!(
113                name: "mz_compute_reconciliation_reused_dataflows_count_total",
114                help: "The total number of dataflows that were reused during compute reconciliation.",
115                var_labels: ["worker_id"],
116            )),
117            reconciliation_replaced_dataflows_count_total: registry.register(metric!(
118                name: "mz_compute_reconciliation_replaced_dataflows_count_total",
119                help: "The total number of dataflows that were replaced during compute reconciliation.",
120                var_labels: ["worker_id", "reason"],
121            )),
122            arrangement_maintenance_seconds_total: registry.register(metric!(
123                name: "mz_arrangement_maintenance_seconds_total",
124                help: "The total time spent maintaining arrangements.",
125                var_labels: ["worker_id"],
126            )),
127            arrangement_maintenance_active_info: registry.register(metric!(
128                name: "mz_arrangement_maintenance_active_info",
129                help: "Whether maintenance is currently occuring.",
130                var_labels: ["worker_id"],
131            )),
132            timely_step_duration_seconds: registry.register(metric!(
133                name: "mz_timely_step_duration_seconds",
134                help: "The time spent in each compute step_or_park call",
135                const_labels: {"cluster" => "compute"},
136                var_labels: ["worker_id"],
137                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
138            )),
139            shared_row_heap_capacity_bytes: registry.register(metric!(
140                name: "mz_dataflow_shared_row_heap_capacity_bytes",
141                help: "The heap capacity of the shared row.",
142                var_labels: ["worker_id"],
143            )),
144            persist_peek_seconds: registry.register(metric!(
145                name: "mz_persist_peek_seconds",
146                help: "Time spent in (experimental) Persist fast-path peeks.",
147                var_labels: ["worker_id"],
148                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
149            )),
150            handle_command_duration_seconds: registry.register(metric!(
151                name: "mz_cluster_handle_command_duration_seconds",
152                help: "Time spent in handling commands.",
153                const_labels: {"cluster" => "compute"},
154                var_labels: ["worker_id", "command_type"],
155                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
156            )),
157            replica_expiration_timestamp_seconds: registry.register(metric!(
158                name: "mz_dataflow_replica_expiration_timestamp_seconds",
159                help: "The replica expiration timestamp in seconds since epoch.",
160                var_labels: ["worker_id"],
161            )),
162            replica_expiration_remaining_seconds: registry.register(metric!(
163                name: "mz_dataflow_replica_expiration_remaining_seconds",
164                help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
165                var_labels: ["worker_id"],
166            )),
167            collection_count: registry.register(metric!(
168                name: "mz_compute_collection_count",
169                help: "The number and hydration status of maintained compute collections.",
170                var_labels: ["worker_id", "type", "hydrated"],
171            )),
172        }
173    }
174
175    /// Sets the workload class for the compute metrics.
176    pub fn set_workload_class(&self, workload_class: Option<String>) {
177        let mut guard = self.workload_class.lock().expect("lock poisoned");
178        *guard = workload_class
179    }
180
181    pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
182        let worker = worker_id.to_string();
183        let arrangement_maintenance_seconds_total = self
184            .arrangement_maintenance_seconds_total
185            .with_label_values(&[&worker]);
186        let arrangement_maintenance_active_info = self
187            .arrangement_maintenance_active_info
188            .with_label_values(&[&worker]);
189        let timely_step_duration_seconds = self
190            .timely_step_duration_seconds
191            .with_label_values(&[&worker]);
192        let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
193        let handle_command_duration_seconds = CommandMetrics::build(|typ| {
194            self.handle_command_duration_seconds
195                .with_label_values(&[&worker, typ])
196        });
197        let replica_expiration_timestamp_seconds = self
198            .replica_expiration_timestamp_seconds
199            .with_label_values(&[&worker]);
200        let replica_expiration_remaining_seconds = self
201            .replica_expiration_remaining_seconds
202            .with_label_values(&[&worker]);
203        let shared_row_heap_capacity_bytes = self
204            .shared_row_heap_capacity_bytes
205            .with_label_values(&[&worker]);
206
207        WorkerMetrics {
208            worker_label: worker,
209            metrics: self.clone(),
210            arrangement_maintenance_seconds_total,
211            arrangement_maintenance_active_info,
212            timely_step_duration_seconds,
213            persist_peek_seconds,
214            handle_command_duration_seconds,
215            replica_expiration_timestamp_seconds,
216            replica_expiration_remaining_seconds,
217            shared_row_heap_capacity_bytes,
218        }
219    }
220}
221
222/// Per-worker metrics.
223#[derive(Clone, Debug)]
224pub struct WorkerMetrics {
225    worker_label: String,
226    metrics: ComputeMetrics,
227
228    /// The amount of time spent in arrangement maintenance.
229    pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
230    /// 1 if this worker is currently doing maintenance.
231    ///
232    /// If maintenance turns out to take a very long time, this will allow us
233    /// to gain a sense that Materialize is stuck on maintenance before the
234    /// maintenance completes
235    pub(crate) arrangement_maintenance_active_info: UIntGauge,
236    /// Histogram of Timely step timings.
237    pub(crate) timely_step_duration_seconds: Histogram,
238    /// Histogram of persist peek durations.
239    pub(crate) persist_peek_seconds: Histogram,
240    /// Histogram of command handling durations.
241    pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
242    /// The timestamp of replica expiration.
243    pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
244    /// Remaining seconds until replica expiration.
245    pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
246    /// Heap capacity of the shared row.
247    shared_row_heap_capacity_bytes: UIntGauge,
248}
249
250impl WorkerMetrics {
251    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
252        let command_counts = CommandMetrics::build(|typ| {
253            self.metrics
254                .history_command_count
255                .with_label_values(&[&self.worker_label, typ])
256        });
257        let dataflow_count = self
258            .metrics
259            .history_dataflow_count
260            .with_label_values(&[&self.worker_label]);
261
262        HistoryMetrics {
263            command_counts,
264            dataflow_count,
265        }
266    }
267
268    /// Record the reconciliation result for a single dataflow.
269    ///
270    /// Reconciliation is recorded as successful if the given properties all hold. Otherwise it is
271    /// recorded as unsuccessful, with a reason based on the first property that does not hold.
272    pub fn record_dataflow_reconciliation(
273        &self,
274        compatible: bool,
275        uncompacted: bool,
276        subscribe_free: bool,
277        dependencies_retained: bool,
278    ) {
279        if !compatible {
280            self.metrics
281                .reconciliation_replaced_dataflows_count_total
282                .with_label_values(&[&self.worker_label, "incompatible"])
283                .inc();
284        } else if !uncompacted {
285            self.metrics
286                .reconciliation_replaced_dataflows_count_total
287                .with_label_values(&[&self.worker_label, "compacted"])
288                .inc();
289        } else if !subscribe_free {
290            self.metrics
291                .reconciliation_replaced_dataflows_count_total
292                .with_label_values(&[&self.worker_label, "subscribe"])
293                .inc();
294        } else if !dependencies_retained {
295            self.metrics
296                .reconciliation_replaced_dataflows_count_total
297                .with_label_values(&[&self.worker_label, "dependency"])
298                .inc();
299        } else {
300            self.metrics
301                .reconciliation_reused_dataflows_count_total
302                .with_label_values(&[&self.worker_label])
303                .inc();
304        }
305    }
306
307    /// Record the heap capacity of the shared row.
308    pub fn record_shared_row_metrics(&self) {
309        let binding = SharedRow::get();
310        self.shared_row_heap_capacity_bytes
311            .set(u64::cast_from(binding.borrow().byte_capacity()));
312    }
313
314    /// Increase the count of maintained collections.
315    fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
316        let hydrated = if hydrated { "1" } else { "0" };
317        self.metrics
318            .collection_count
319            .with_label_values(&[&self.worker_label, collection_type, hydrated])
320            .inc();
321    }
322
323    /// Decrease the count of maintained collections.
324    fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
325        let hydrated = if hydrated { "1" } else { "0" };
326        self.metrics
327            .collection_count
328            .with_label_values(&[&self.worker_label, collection_type, hydrated])
329            .dec();
330    }
331
332    /// Sets the workload class for the compute metrics.
333    pub fn set_workload_class(&self, workload_class: Option<String>) {
334        self.metrics.set_workload_class(workload_class);
335    }
336
337    pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
338        CollectionMetrics::new(id, self.clone())
339    }
340}
341
342/// Collection metrics.
343///
344/// Note that these metrics do _not_ have a `collection_id` label. We avoid introducing
345/// per-collection, per-worker metrics because the number of resulting time series would
346/// potentially be huge. Instead we count classes of collections, such as hydrated collections.
347#[derive(Clone, Debug)]
348pub struct CollectionMetrics {
349    metrics: WorkerMetrics,
350    collection_type: &'static str,
351    collection_hydrated: bool,
352}
353
354impl CollectionMetrics {
355    pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
356        let collection_type = match collection_id {
357            GlobalId::System(_) => "system",
358            GlobalId::IntrospectionSourceIndex(_) => "log",
359            GlobalId::User(_) => "user",
360            GlobalId::Transient(_) => "transient",
361            GlobalId::Explain => "explain",
362        };
363        let collection_hydrated = false;
364
365        metrics.inc_collection_count(collection_type, collection_hydrated);
366
367        Self {
368            metrics,
369            collection_type,
370            collection_hydrated,
371        }
372    }
373
374    /// Record this collection as hydration.
375    pub fn record_collection_hydrated(&mut self) {
376        if self.collection_hydrated {
377            return;
378        }
379
380        self.metrics
381            .dec_collection_count(self.collection_type, false);
382        self.metrics
383            .inc_collection_count(self.collection_type, true);
384        self.collection_hydrated = true;
385    }
386}
387
388impl Drop for CollectionMetrics {
389    fn drop(&mut self) {
390        self.metrics
391            .dec_collection_count(self.collection_type, self.collection_hydrated);
392    }
393}