mz_persist/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation-specific metrics for persist blobs and consensus
11
12use std::sync::Arc;
13use std::time::Instant;
14
15use mz_dyncfg::ConfigSet;
16use mz_ore::lgbytes::{LgBytesMetrics, LgBytesOpMetrics};
17use mz_ore::metric;
18use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
19use prometheus::IntCounterVec;
20
21/// Metrics specific to S3Blob's internal workings.
22#[derive(Debug, Clone)]
23pub struct S3BlobMetrics {
24    pub(crate) operation_timeouts: IntCounter,
25    pub(crate) operation_attempt_timeouts: IntCounter,
26    pub(crate) connect_timeouts: IntCounter,
27    pub(crate) read_timeouts: IntCounter,
28    pub(crate) get_part: IntCounter,
29    pub(crate) get_invalid_resp: IntCounter,
30    pub(crate) set_single: IntCounter,
31    pub(crate) set_multi_create: IntCounter,
32    pub(crate) set_multi_part: IntCounter,
33    pub(crate) set_multi_complete: IntCounter,
34    pub(crate) delete_head: IntCounter,
35    pub(crate) delete_object: IntCounter,
36    pub(crate) list_objects: IntCounter,
37    pub(crate) error_counts: IntCounterVec,
38
39    /// Metrics for all usages of LgBytes. Exposed as public for convenience in
40    /// persist boot, we'll have to pull this out and do the plumbing
41    /// differently if mz gains a non-persist user of LgBytes.
42    pub lgbytes: LgBytesMetrics,
43}
44
45impl S3BlobMetrics {
46    /// Returns a new [S3BlobMetrics] instance connected to the given registry.
47    pub fn new(registry: &MetricsRegistry) -> Self {
48        let operations: IntCounterVec = registry.register(metric!(
49            name: "mz_persist_s3_operations",
50            help: "number of raw s3 calls on behalf of Blob interface methods",
51            var_labels: ["op"],
52        ));
53        let errors: IntCounterVec = registry.register(metric!(
54            name: "mz_persist_s3_errors",
55            help: "errors",
56            var_labels: ["op", "code"],
57        ));
58        Self {
59            operation_timeouts: registry.register(metric!(
60                name: "mz_persist_s3_operation_timeouts",
61                help: "number of operation timeouts (including retries)",
62            )),
63            operation_attempt_timeouts: registry.register(metric!(
64                name: "mz_persist_s3_operation_attempt_timeouts",
65                help: "number of operation attempt timeouts (within a single retry)",
66            )),
67            connect_timeouts: registry.register(metric!(
68                name: "mz_persist_s3_connect_timeouts",
69                help: "number of timeouts establishing a connection to S3",
70            )),
71            read_timeouts: registry.register(metric!(
72                name: "mz_persist_s3_read_timeouts",
73                help: "number of timeouts waiting on first response byte from S3",
74            )),
75            get_part: operations.with_label_values(&["get_part"]),
76            get_invalid_resp: operations.with_label_values(&["get_invalid_resp"]),
77            set_single: operations.with_label_values(&["set_single"]),
78            set_multi_create: operations.with_label_values(&["set_multi_create"]),
79            set_multi_part: operations.with_label_values(&["set_multi_part"]),
80            set_multi_complete: operations.with_label_values(&["set_multi_complete"]),
81            delete_head: operations.with_label_values(&["delete_head"]),
82            delete_object: operations.with_label_values(&["delete_object"]),
83            list_objects: operations.with_label_values(&["list_objects"]),
84            error_counts: errors,
85            lgbytes: LgBytesMetrics::new(registry),
86        }
87    }
88}
89
90/// Metrics specific to our usage of Arrow and Parquet.
91#[derive(Debug, Clone)]
92pub struct ArrowMetrics {
93    pub(crate) key: ArrowColumnMetrics,
94    pub(crate) val: ArrowColumnMetrics,
95    pub(crate) part_build_seconds: Counter,
96    pub(crate) part_build_count: IntCounter,
97    pub(crate) concat_bytes: IntCounter,
98}
99
100impl ArrowMetrics {
101    /// Returns a new [ArrowMetrics] instance connected to the given registry.
102    pub fn new(registry: &MetricsRegistry) -> Self {
103        let op_count: IntCounterVec = registry.register(metric!(
104            name: "mz_persist_columnar_op_count",
105            help: "number of rows we've run the specified op on in our structured columnar format",
106            var_labels: ["op", "column", "result"],
107        ));
108
109        let part_build_seconds: Counter = registry.register(metric!(
110            name: "mz_persist_columnar_part_build_seconds",
111            help: "number of seconds we've spent encoding our structured columnar format",
112        ));
113        let part_build_count: IntCounter = registry.register(metric!(
114            name: "mz_persist_columnar_part_build_count",
115            help: "number of times we've encoded our structured columnar format",
116        ));
117        let concat_bytes: IntCounter = registry.register(metric!(
118            name: "mz_persist_columnar_part_concat_bytes",
119            help: "number of bytes we've copied when concatenating updates",
120        ));
121
122        ArrowMetrics {
123            key: ArrowColumnMetrics::new(&op_count, "key"),
124            val: ArrowColumnMetrics::new(&op_count, "val"),
125            part_build_seconds,
126            part_build_count,
127            concat_bytes,
128        }
129    }
130
131    /// Metrics for the top-level 'k_s' column.
132    pub fn key(&self) -> &ArrowColumnMetrics {
133        &self.key
134    }
135
136    /// Metrics for the top-level 'v_s' column.
137    pub fn val(&self) -> &ArrowColumnMetrics {
138        &self.val
139    }
140
141    /// Measure and report how long building a Part takes.
142    pub fn measure_part_build<R, F: FnOnce() -> R>(&self, f: F) -> R {
143        let start = Instant::now();
144        let r = f();
145        let duration = start.elapsed();
146
147        self.part_build_count.inc();
148        self.part_build_seconds.inc_by(duration.as_secs_f64());
149
150        r
151    }
152}
153
154/// Metrics for a top-level [`arrow`] column in our structured representation.
155#[derive(Debug, Clone)]
156pub struct ArrowColumnMetrics {
157    correct_count: IntCounter,
158    invalid_count: IntCounter,
159}
160
161impl ArrowColumnMetrics {
162    fn new(count: &IntCounterVec, col: &'static str) -> Self {
163        ArrowColumnMetrics {
164            correct_count: count.with_label_values(&["validation", col, "correct"]),
165            invalid_count: count.with_label_values(&["validation", col, "invalid"]),
166        }
167    }
168
169    /// Measure and report statistics for validation.
170    pub fn report_valid<F: FnOnce() -> bool>(&self, f: F) -> bool {
171        let is_valid = f();
172        if is_valid {
173            self.correct_count.inc();
174        } else {
175            self.invalid_count.inc();
176        }
177        is_valid
178    }
179}
180
181/// Metrics for a Parquet file that we write to S3.
182#[derive(Debug, Clone)]
183pub struct ParquetMetrics {
184    pub(crate) encoded_size: IntCounterVec,
185    pub(crate) num_row_groups: IntCounterVec,
186    pub(crate) k_metrics: ParquetColumnMetrics,
187    pub(crate) v_metrics: ParquetColumnMetrics,
188    pub(crate) t_metrics: ParquetColumnMetrics,
189    pub(crate) d_metrics: ParquetColumnMetrics,
190    pub(crate) k_s_metrics: ParquetColumnMetrics,
191    pub(crate) v_s_metrics: ParquetColumnMetrics,
192    pub(crate) elided_null_buffers: IntCounter,
193}
194
195impl ParquetMetrics {
196    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
197        let encoded_size: IntCounterVec = registry.register(metric!(
198            name: "mz_persist_parquet_encoded_size",
199            help: "encoded size of a parquet file that we write to S3",
200            var_labels: ["format"],
201        ));
202        let num_row_groups: IntCounterVec = registry.register(metric!(
203            name: "mz_persist_parquet_row_group_count",
204            help: "count of row groups in a parquet file",
205            var_labels: ["format"],
206        ));
207
208        let column_size: IntCounterVec = registry.register(metric!(
209            name: "mz_persist_parquet_column_size",
210            help: "size in bytes of a column within a parquet file",
211            var_labels: ["col", "compressed"],
212        ));
213
214        ParquetMetrics {
215            encoded_size,
216            num_row_groups,
217            k_metrics: ParquetColumnMetrics::new("k", &column_size),
218            v_metrics: ParquetColumnMetrics::new("v", &column_size),
219            t_metrics: ParquetColumnMetrics::new("t", &column_size),
220            d_metrics: ParquetColumnMetrics::new("d", &column_size),
221            k_s_metrics: ParquetColumnMetrics::new("k_s", &column_size),
222            v_s_metrics: ParquetColumnMetrics::new("v_s", &column_size),
223            elided_null_buffers: registry.register(metric!(
224                name: "mz_persist_parquet_elided_null_buffer_count",
225                help: "times we dropped an unnecessary null buffer returned by parquet decoding",
226            )),
227        }
228    }
229}
230
231/// Metrics for a column within a Parquet file that we write to S3.
232#[derive(Debug, Clone)]
233pub struct ParquetColumnMetrics {
234    pub(crate) uncompressed_size: IntCounter,
235    pub(crate) compressed_size: IntCounter,
236}
237
238impl ParquetColumnMetrics {
239    pub(crate) fn new(col: &'static str, size: &IntCounterVec) -> Self {
240        ParquetColumnMetrics {
241            uncompressed_size: size.with_label_values(&[col, "uncompressed"]),
242            compressed_size: size.with_label_values(&[col, "compressed"]),
243        }
244    }
245
246    pub(crate) fn report_sizes(&self, uncompressed: u64, compressed: u64) {
247        self.uncompressed_size.inc_by(uncompressed);
248        self.compressed_size.inc_by(compressed);
249    }
250}
251
252/// Metrics for `ColumnarRecords`.
253#[derive(Debug)]
254pub struct ColumnarMetrics {
255    pub(crate) lgbytes_arrow: LgBytesOpMetrics,
256    pub(crate) parquet: ParquetMetrics,
257    pub(crate) arrow: ArrowMetrics,
258    // TODO: Having these two here isn't quite the right thing to do, but it
259    // saves a LOT of plumbing.
260    pub(crate) cfg: Arc<ConfigSet>,
261    pub(crate) is_cc_active: bool,
262}
263
264impl ColumnarMetrics {
265    /// Returns a new [ColumnarMetrics].
266    pub fn new(
267        registry: &MetricsRegistry,
268        lgbytes: &LgBytesMetrics,
269        cfg: Arc<ConfigSet>,
270        is_cc_active: bool,
271    ) -> Self {
272        ColumnarMetrics {
273            parquet: ParquetMetrics::new(registry),
274            arrow: ArrowMetrics::new(registry),
275            lgbytes_arrow: lgbytes.persist_arrow.clone(),
276            cfg,
277            is_cc_active,
278        }
279    }
280
281    /// Returns a reference to the [`arrow`] metrics for our structured data representation.
282    pub fn arrow(&self) -> &ArrowMetrics {
283        &self.arrow
284    }
285
286    /// Returns a reference to the [`parquet`] metrics for our structured data representation.
287    pub fn parquet(&self) -> &ParquetMetrics {
288        &self.parquet
289    }
290
291    /// Returns a [ColumnarMetrics] disconnected from any metrics registry.
292    ///
293    /// Exposed for testing.
294    pub fn disconnected() -> Self {
295        let registry = MetricsRegistry::new();
296        let lgbytes = LgBytesMetrics::new(&registry);
297        let cfg = crate::cfg::all_dyn_configs(ConfigSet::default());
298
299        Self::new(&registry, &lgbytes, Arc::new(cfg), false)
300    }
301}