mz_persist/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation-specific metrics for persist blobs and consensus
11
12use std::sync::Arc;
13use std::time::Instant;
14
15use mz_dyncfg::ConfigSet;
16use mz_ore::lgbytes::{LgBytesMetrics, LgBytesOpMetrics};
17use mz_ore::metric;
18use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
19use prometheus::{CounterVec, IntCounterVec};
20
21/// Metrics specific to S3Blob's internal workings.
22#[derive(Debug, Clone)]
23pub struct S3BlobMetrics {
24    pub(crate) operation_timeouts: IntCounter,
25    pub(crate) operation_attempt_timeouts: IntCounter,
26    pub(crate) connect_timeouts: IntCounter,
27    pub(crate) read_timeouts: IntCounter,
28    pub(crate) get_part: IntCounter,
29    pub(crate) get_invalid_resp: IntCounter,
30    pub(crate) set_single: IntCounter,
31    pub(crate) set_multi_create: IntCounter,
32    pub(crate) set_multi_part: IntCounter,
33    pub(crate) set_multi_complete: IntCounter,
34    pub(crate) delete_head: IntCounter,
35    pub(crate) delete_object: IntCounter,
36    pub(crate) list_objects: IntCounter,
37    pub(crate) error_counts: IntCounterVec,
38
39    /// Metrics for all usages of LgBytes. Exposed as public for convenience in
40    /// persist boot, we'll have to pull this out and do the plumbing
41    /// differently if mz gains a non-persist user of LgBytes.
42    pub lgbytes: LgBytesMetrics,
43}
44
45impl S3BlobMetrics {
46    /// Returns a new [S3BlobMetrics] instance connected to the given registry.
47    pub fn new(registry: &MetricsRegistry) -> Self {
48        let operations: IntCounterVec = registry.register(metric!(
49            name: "mz_persist_s3_operations",
50            help: "number of raw s3 calls on behalf of Blob interface methods",
51            var_labels: ["op"],
52        ));
53        let errors: IntCounterVec = registry.register(metric!(
54            name: "mz_persist_s3_errors",
55            help: "errors",
56            var_labels: ["op", "code"],
57        ));
58        Self {
59            operation_timeouts: registry.register(metric!(
60                name: "mz_persist_s3_operation_timeouts",
61                help: "number of operation timeouts (including retries)",
62            )),
63            operation_attempt_timeouts: registry.register(metric!(
64                name: "mz_persist_s3_operation_attempt_timeouts",
65                help: "number of operation attempt timeouts (within a single retry)",
66            )),
67            connect_timeouts: registry.register(metric!(
68                name: "mz_persist_s3_connect_timeouts",
69                help: "number of timeouts establishing a connection to S3",
70            )),
71            read_timeouts: registry.register(metric!(
72                name: "mz_persist_s3_read_timeouts",
73                help: "number of timeouts waiting on first response byte from S3",
74            )),
75            get_part: operations.with_label_values(&["get_part"]),
76            get_invalid_resp: operations.with_label_values(&["get_invalid_resp"]),
77            set_single: operations.with_label_values(&["set_single"]),
78            set_multi_create: operations.with_label_values(&["set_multi_create"]),
79            set_multi_part: operations.with_label_values(&["set_multi_part"]),
80            set_multi_complete: operations.with_label_values(&["set_multi_complete"]),
81            delete_head: operations.with_label_values(&["delete_head"]),
82            delete_object: operations.with_label_values(&["delete_object"]),
83            list_objects: operations.with_label_values(&["list_objects"]),
84            error_counts: errors,
85            lgbytes: LgBytesMetrics::new(registry),
86        }
87    }
88}
89
90/// Metrics specific to our usage of Arrow and Parquet.
91#[derive(Debug, Clone)]
92pub struct ArrowMetrics {
93    pub(crate) key: ArrowColumnMetrics,
94    pub(crate) val: ArrowColumnMetrics,
95    pub(crate) part_build_seconds: Counter,
96    pub(crate) part_build_count: IntCounter,
97    pub(crate) concat_bytes: IntCounter,
98}
99
100impl ArrowMetrics {
101    /// Returns a new [ArrowMetrics] instance connected to the given registry.
102    pub fn new(registry: &MetricsRegistry) -> Self {
103        let op_count: IntCounterVec = registry.register(metric!(
104            name: "mz_persist_columnar_op_count",
105            help: "number of rows we've run the specified op on in our structured columnar format",
106            var_labels: ["op", "column", "result"],
107        ));
108        let op_seconds: CounterVec = registry.register(metric!(
109            name: "mz_persist_columnar_op_seconds",
110            help: "numer of seconds we've spent running the specified op on our structured columnar format",
111            var_labels: ["op", "column"],
112        ));
113
114        let part_build_seconds: Counter = registry.register(metric!(
115            name: "mz_persist_columnar_part_build_seconds",
116            help: "number of seconds we've spent encoding our structured columnar format",
117        ));
118        let part_build_count: IntCounter = registry.register(metric!(
119            name: "mz_persist_columnar_part_build_count",
120            help: "number of times we've encoded our structured columnar format",
121        ));
122        let concat_bytes: IntCounter = registry.register(metric!(
123            name: "mz_persist_columnar_part_concat_bytes",
124            help: "number of bytes we've copied when concatenating updates",
125        ));
126
127        ArrowMetrics {
128            key: ArrowColumnMetrics::new(&op_count, &op_seconds, "key"),
129            val: ArrowColumnMetrics::new(&op_count, &op_seconds, "val"),
130            part_build_seconds,
131            part_build_count,
132            concat_bytes,
133        }
134    }
135
136    /// Metrics for the top-level 'k_s' column.
137    pub fn key(&self) -> &ArrowColumnMetrics {
138        &self.key
139    }
140
141    /// Metrics for the top-level 'v_s' column.
142    pub fn val(&self) -> &ArrowColumnMetrics {
143        &self.val
144    }
145
146    /// Measure and report how long building a Part takes.
147    pub fn measure_part_build<R, F: FnOnce() -> R>(&self, f: F) -> R {
148        let start = Instant::now();
149        let r = f();
150        let duration = start.elapsed();
151
152        self.part_build_count.inc();
153        self.part_build_seconds.inc_by(duration.as_secs_f64());
154
155        r
156    }
157}
158
159/// Metrics for a top-level [`arrow`] column in our structured representation.
160#[derive(Debug, Clone)]
161pub struct ArrowColumnMetrics {
162    decoding_count: IntCounter,
163    decoding_seconds: Counter,
164    correct_count: IntCounter,
165    invalid_count: IntCounter,
166}
167
168impl ArrowColumnMetrics {
169    fn new(count: &IntCounterVec, duration: &CounterVec, col: &'static str) -> Self {
170        ArrowColumnMetrics {
171            decoding_count: count.with_label_values(&["decode", col, "success"]),
172            decoding_seconds: duration.with_label_values(&["decode", col]),
173            correct_count: count.with_label_values(&["validation", col, "correct"]),
174            invalid_count: count.with_label_values(&["validation", col, "invalid"]),
175        }
176    }
177
178    /// Measure and report how long decoding takes.
179    pub fn measure_decoding<R, F: FnOnce() -> R>(&self, decode: F) -> R {
180        let start = Instant::now();
181        let result = decode();
182        let duration = start.elapsed();
183
184        self.decoding_count.inc();
185        self.decoding_seconds.inc_by(duration.as_secs_f64());
186
187        result
188    }
189
190    /// Measure and report statistics for validation.
191    pub fn report_valid<F: FnOnce() -> bool>(&self, f: F) -> bool {
192        let is_valid = f();
193        if is_valid {
194            self.correct_count.inc();
195        } else {
196            self.invalid_count.inc();
197        }
198        is_valid
199    }
200}
201
202/// Metrics for a Parquet file that we write to S3.
203#[derive(Debug, Clone)]
204pub struct ParquetMetrics {
205    pub(crate) encoded_size: IntCounterVec,
206    pub(crate) num_row_groups: IntCounterVec,
207    pub(crate) k_metrics: ParquetColumnMetrics,
208    pub(crate) v_metrics: ParquetColumnMetrics,
209    pub(crate) t_metrics: ParquetColumnMetrics,
210    pub(crate) d_metrics: ParquetColumnMetrics,
211    pub(crate) k_s_metrics: ParquetColumnMetrics,
212    pub(crate) v_s_metrics: ParquetColumnMetrics,
213    pub(crate) elided_null_buffers: IntCounter,
214}
215
216impl ParquetMetrics {
217    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
218        let encoded_size: IntCounterVec = registry.register(metric!(
219            name: "mz_persist_parquet_encoded_size",
220            help: "encoded size of a parquet file that we write to S3",
221            var_labels: ["format"],
222        ));
223        let num_row_groups: IntCounterVec = registry.register(metric!(
224            name: "mz_persist_parquet_row_group_count",
225            help: "count of row groups in a parquet file",
226            var_labels: ["format"],
227        ));
228
229        let column_size: IntCounterVec = registry.register(metric!(
230            name: "mz_persist_parquet_column_size",
231            help: "size in bytes of a column within a parquet file",
232            var_labels: ["col", "compressed"],
233        ));
234
235        ParquetMetrics {
236            encoded_size,
237            num_row_groups,
238            k_metrics: ParquetColumnMetrics::new("k", &column_size),
239            v_metrics: ParquetColumnMetrics::new("v", &column_size),
240            t_metrics: ParquetColumnMetrics::new("t", &column_size),
241            d_metrics: ParquetColumnMetrics::new("d", &column_size),
242            k_s_metrics: ParquetColumnMetrics::new("k_s", &column_size),
243            v_s_metrics: ParquetColumnMetrics::new("v_s", &column_size),
244            elided_null_buffers: registry.register(metric!(
245                name: "mz_persist_parquet_elided_null_buffer_count",
246                help: "times we dropped an unnecessary null buffer returned by parquet decoding",
247            )),
248        }
249    }
250}
251
252/// Metrics for a column within a Parquet file that we write to S3.
253#[derive(Debug, Clone)]
254pub struct ParquetColumnMetrics {
255    pub(crate) uncompressed_size: IntCounter,
256    pub(crate) compressed_size: IntCounter,
257}
258
259impl ParquetColumnMetrics {
260    pub(crate) fn new(col: &'static str, size: &IntCounterVec) -> Self {
261        ParquetColumnMetrics {
262            uncompressed_size: size.with_label_values(&[col, "uncompressed"]),
263            compressed_size: size.with_label_values(&[col, "compressed"]),
264        }
265    }
266
267    pub(crate) fn report_sizes(&self, uncompressed: u64, compressed: u64) {
268        self.uncompressed_size.inc_by(uncompressed);
269        self.compressed_size.inc_by(compressed);
270    }
271}
272
273/// Metrics for `ColumnarRecords`.
274#[derive(Debug)]
275pub struct ColumnarMetrics {
276    pub(crate) lgbytes_arrow: LgBytesOpMetrics,
277    pub(crate) parquet: ParquetMetrics,
278    pub(crate) arrow: ArrowMetrics,
279    // TODO: Having these two here isn't quite the right thing to do, but it
280    // saves a LOT of plumbing.
281    pub(crate) cfg: Arc<ConfigSet>,
282    pub(crate) is_cc_active: bool,
283}
284
285impl ColumnarMetrics {
286    /// Returns a new [ColumnarMetrics].
287    pub fn new(
288        registry: &MetricsRegistry,
289        lgbytes: &LgBytesMetrics,
290        cfg: Arc<ConfigSet>,
291        is_cc_active: bool,
292    ) -> Self {
293        ColumnarMetrics {
294            parquet: ParquetMetrics::new(registry),
295            arrow: ArrowMetrics::new(registry),
296            lgbytes_arrow: lgbytes.persist_arrow.clone(),
297            cfg,
298            is_cc_active,
299        }
300    }
301
302    /// Returns a reference to the [`arrow`] metrics for our structured data representation.
303    pub fn arrow(&self) -> &ArrowMetrics {
304        &self.arrow
305    }
306
307    /// Returns a reference to the [`parquet`] metrics for our structured data representation.
308    pub fn parquet(&self) -> &ParquetMetrics {
309        &self.parquet
310    }
311
312    /// Returns a [ColumnarMetrics] disconnected from any metrics registry.
313    ///
314    /// Exposed for testing.
315    pub fn disconnected() -> Self {
316        let registry = MetricsRegistry::new();
317        let lgbytes = LgBytesMetrics::new(&registry);
318        let cfg = crate::cfg::all_dyn_configs(ConfigSet::default());
319
320        Self::new(&registry, &lgbytes, Arc::new(cfg), false)
321    }
322}