Skip to main content

mz_persist/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation-specific metrics for persist blobs and consensus
11
12use std::time::Instant;
13
14use mz_ore::metric;
15use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
16use prometheus::IntCounterVec;
17
18/// Metrics specific to S3Blob's internal workings.
19#[derive(Debug, Clone)]
20pub struct S3BlobMetrics {
21    pub(crate) operation_timeouts: IntCounter,
22    pub(crate) operation_attempt_timeouts: IntCounter,
23    pub(crate) connect_timeouts: IntCounter,
24    pub(crate) read_timeouts: IntCounter,
25    pub(crate) get_part: IntCounter,
26    pub(crate) get_invalid_resp: IntCounter,
27    pub(crate) set_single: IntCounter,
28    pub(crate) set_multi_create: IntCounter,
29    pub(crate) set_multi_part: IntCounter,
30    pub(crate) set_multi_complete: IntCounter,
31    pub(crate) delete_head: IntCounter,
32    pub(crate) delete_object: IntCounter,
33    pub(crate) list_objects: IntCounter,
34    pub(crate) error_counts: IntCounterVec,
35}
36
37impl S3BlobMetrics {
38    /// Returns a new [S3BlobMetrics] instance connected to the given registry.
39    pub fn new(registry: &MetricsRegistry) -> Self {
40        let operations: IntCounterVec = registry.register(metric!(
41            name: "mz_persist_s3_operations",
42            help: "number of raw s3 calls on behalf of Blob interface methods",
43            var_labels: ["op"],
44        ));
45        let errors: IntCounterVec = registry.register(metric!(
46            name: "mz_persist_s3_errors",
47            help: "errors",
48            var_labels: ["op", "code"],
49        ));
50        Self {
51            operation_timeouts: registry.register(metric!(
52                name: "mz_persist_s3_operation_timeouts",
53                help: "number of operation timeouts (including retries)",
54            )),
55            operation_attempt_timeouts: registry.register(metric!(
56                name: "mz_persist_s3_operation_attempt_timeouts",
57                help: "number of operation attempt timeouts (within a single retry)",
58            )),
59            connect_timeouts: registry.register(metric!(
60                name: "mz_persist_s3_connect_timeouts",
61                help: "number of timeouts establishing a connection to S3",
62            )),
63            read_timeouts: registry.register(metric!(
64                name: "mz_persist_s3_read_timeouts",
65                help: "number of timeouts waiting on first response byte from S3",
66            )),
67            get_part: operations.with_label_values(&["get_part"]),
68            get_invalid_resp: operations.with_label_values(&["get_invalid_resp"]),
69            set_single: operations.with_label_values(&["set_single"]),
70            set_multi_create: operations.with_label_values(&["set_multi_create"]),
71            set_multi_part: operations.with_label_values(&["set_multi_part"]),
72            set_multi_complete: operations.with_label_values(&["set_multi_complete"]),
73            delete_head: operations.with_label_values(&["delete_head"]),
74            delete_object: operations.with_label_values(&["delete_object"]),
75            list_objects: operations.with_label_values(&["list_objects"]),
76            error_counts: errors,
77        }
78    }
79}
80
81/// Metrics specific to our usage of Arrow and Parquet.
82#[derive(Debug, Clone)]
83pub struct ArrowMetrics {
84    pub(crate) key: ArrowColumnMetrics,
85    pub(crate) val: ArrowColumnMetrics,
86    pub(crate) part_build_seconds: Counter,
87    pub(crate) part_build_count: IntCounter,
88    pub(crate) concat_bytes: IntCounter,
89}
90
91impl ArrowMetrics {
92    /// Returns a new [ArrowMetrics] instance connected to the given registry.
93    pub fn new(registry: &MetricsRegistry) -> Self {
94        let op_count: IntCounterVec = registry.register(metric!(
95            name: "mz_persist_columnar_op_count",
96            help: "number of rows we've run the specified op on in our structured columnar format",
97            var_labels: ["op", "column", "result"],
98        ));
99
100        let part_build_seconds: Counter = registry.register(metric!(
101            name: "mz_persist_columnar_part_build_seconds",
102            help: "number of seconds we've spent encoding our structured columnar format",
103        ));
104        let part_build_count: IntCounter = registry.register(metric!(
105            name: "mz_persist_columnar_part_build_count",
106            help: "number of times we've encoded our structured columnar format",
107        ));
108        let concat_bytes: IntCounter = registry.register(metric!(
109            name: "mz_persist_columnar_part_concat_bytes",
110            help: "number of bytes we've copied when concatenating updates",
111        ));
112
113        ArrowMetrics {
114            key: ArrowColumnMetrics::new(&op_count, "key"),
115            val: ArrowColumnMetrics::new(&op_count, "val"),
116            part_build_seconds,
117            part_build_count,
118            concat_bytes,
119        }
120    }
121
122    /// Metrics for the top-level 'k_s' column.
123    pub fn key(&self) -> &ArrowColumnMetrics {
124        &self.key
125    }
126
127    /// Metrics for the top-level 'v_s' column.
128    pub fn val(&self) -> &ArrowColumnMetrics {
129        &self.val
130    }
131
132    /// Measure and report how long building a Part takes.
133    pub fn measure_part_build<R, F: FnOnce() -> R>(&self, f: F) -> R {
134        let start = Instant::now();
135        let r = f();
136        let duration = start.elapsed();
137
138        self.part_build_count.inc();
139        self.part_build_seconds.inc_by(duration.as_secs_f64());
140
141        r
142    }
143}
144
145/// Metrics for a top-level [`arrow`] column in our structured representation.
146#[derive(Debug, Clone)]
147pub struct ArrowColumnMetrics {
148    correct_count: IntCounter,
149    invalid_count: IntCounter,
150}
151
152impl ArrowColumnMetrics {
153    fn new(count: &IntCounterVec, col: &'static str) -> Self {
154        ArrowColumnMetrics {
155            correct_count: count.with_label_values(&["validation", col, "correct"]),
156            invalid_count: count.with_label_values(&["validation", col, "invalid"]),
157        }
158    }
159
160    /// Measure and report statistics for validation.
161    pub fn report_valid<F: FnOnce() -> bool>(&self, f: F) -> bool {
162        let is_valid = f();
163        if is_valid {
164            self.correct_count.inc();
165        } else {
166            self.invalid_count.inc();
167        }
168        is_valid
169    }
170}
171
172/// Metrics for a Parquet file that we write to S3.
173#[derive(Debug, Clone)]
174pub struct ParquetMetrics {
175    pub(crate) encoded_size: IntCounterVec,
176    pub(crate) num_row_groups: IntCounterVec,
177    pub(crate) k_metrics: ParquetColumnMetrics,
178    pub(crate) v_metrics: ParquetColumnMetrics,
179    pub(crate) t_metrics: ParquetColumnMetrics,
180    pub(crate) d_metrics: ParquetColumnMetrics,
181    pub(crate) k_s_metrics: ParquetColumnMetrics,
182    pub(crate) v_s_metrics: ParquetColumnMetrics,
183    pub(crate) elided_null_buffers: IntCounter,
184}
185
186impl ParquetMetrics {
187    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
188        let encoded_size: IntCounterVec = registry.register(metric!(
189            name: "mz_persist_parquet_encoded_size",
190            help: "encoded size of a parquet file that we write to S3",
191            var_labels: ["format"],
192        ));
193        let num_row_groups: IntCounterVec = registry.register(metric!(
194            name: "mz_persist_parquet_row_group_count",
195            help: "count of row groups in a parquet file",
196            var_labels: ["format"],
197        ));
198
199        let column_size: IntCounterVec = registry.register(metric!(
200            name: "mz_persist_parquet_column_size",
201            help: "size in bytes of a column within a parquet file",
202            var_labels: ["col", "compressed"],
203        ));
204
205        ParquetMetrics {
206            encoded_size,
207            num_row_groups,
208            k_metrics: ParquetColumnMetrics::new("k", &column_size),
209            v_metrics: ParquetColumnMetrics::new("v", &column_size),
210            t_metrics: ParquetColumnMetrics::new("t", &column_size),
211            d_metrics: ParquetColumnMetrics::new("d", &column_size),
212            k_s_metrics: ParquetColumnMetrics::new("k_s", &column_size),
213            v_s_metrics: ParquetColumnMetrics::new("v_s", &column_size),
214            elided_null_buffers: registry.register(metric!(
215                name: "mz_persist_parquet_elided_null_buffer_count",
216                help: "times we dropped an unnecessary null buffer returned by parquet decoding",
217            )),
218        }
219    }
220}
221
222/// Metrics for a column within a Parquet file that we write to S3.
223#[derive(Debug, Clone)]
224pub struct ParquetColumnMetrics {
225    pub(crate) uncompressed_size: IntCounter,
226    pub(crate) compressed_size: IntCounter,
227}
228
229impl ParquetColumnMetrics {
230    pub(crate) fn new(col: &'static str, size: &IntCounterVec) -> Self {
231        ParquetColumnMetrics {
232            uncompressed_size: size.with_label_values(&[col, "uncompressed"]),
233            compressed_size: size.with_label_values(&[col, "compressed"]),
234        }
235    }
236
237    pub(crate) fn report_sizes(&self, uncompressed: u64, compressed: u64) {
238        self.uncompressed_size.inc_by(uncompressed);
239        self.compressed_size.inc_by(compressed);
240    }
241}
242
243/// Metrics for `ColumnarRecords`.
244#[derive(Debug)]
245pub struct ColumnarMetrics {
246    pub(crate) parquet: ParquetMetrics,
247    pub(crate) arrow: ArrowMetrics,
248}
249
250impl ColumnarMetrics {
251    /// Returns a new [ColumnarMetrics].
252    pub fn new(registry: &MetricsRegistry) -> Self {
253        ColumnarMetrics {
254            parquet: ParquetMetrics::new(registry),
255            arrow: ArrowMetrics::new(registry),
256        }
257    }
258
259    /// Returns a reference to the [`arrow`] metrics for our structured data representation.
260    pub fn arrow(&self) -> &ArrowMetrics {
261        &self.arrow
262    }
263
264    /// Returns a reference to the [`parquet`] metrics for our structured data representation.
265    pub fn parquet(&self) -> &ParquetMetrics {
266        &self.parquet
267    }
268
269    /// Returns a [ColumnarMetrics] disconnected from any metrics registry.
270    ///
271    /// Exposed for testing.
272    pub fn disconnected() -> Self {
273        Self::new(&MetricsRegistry::new())
274    }
275}