1use std::time::Instant;
13
14use mz_ore::metric;
15use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
16use prometheus::IntCounterVec;
17
18#[derive(Debug, Clone)]
20pub struct S3BlobMetrics {
21 pub(crate) operation_timeouts: IntCounter,
22 pub(crate) operation_attempt_timeouts: IntCounter,
23 pub(crate) connect_timeouts: IntCounter,
24 pub(crate) read_timeouts: IntCounter,
25 pub(crate) get_part: IntCounter,
26 pub(crate) get_invalid_resp: IntCounter,
27 pub(crate) set_single: IntCounter,
28 pub(crate) set_multi_create: IntCounter,
29 pub(crate) set_multi_part: IntCounter,
30 pub(crate) set_multi_complete: IntCounter,
31 pub(crate) delete_head: IntCounter,
32 pub(crate) delete_object: IntCounter,
33 pub(crate) list_objects: IntCounter,
34 pub(crate) error_counts: IntCounterVec,
35}
36
37impl S3BlobMetrics {
38 pub fn new(registry: &MetricsRegistry) -> Self {
40 let operations: IntCounterVec = registry.register(metric!(
41 name: "mz_persist_s3_operations",
42 help: "number of raw s3 calls on behalf of Blob interface methods",
43 var_labels: ["op"],
44 ));
45 let errors: IntCounterVec = registry.register(metric!(
46 name: "mz_persist_s3_errors",
47 help: "errors",
48 var_labels: ["op", "code"],
49 ));
50 Self {
51 operation_timeouts: registry.register(metric!(
52 name: "mz_persist_s3_operation_timeouts",
53 help: "number of operation timeouts (including retries)",
54 )),
55 operation_attempt_timeouts: registry.register(metric!(
56 name: "mz_persist_s3_operation_attempt_timeouts",
57 help: "number of operation attempt timeouts (within a single retry)",
58 )),
59 connect_timeouts: registry.register(metric!(
60 name: "mz_persist_s3_connect_timeouts",
61 help: "number of timeouts establishing a connection to S3",
62 )),
63 read_timeouts: registry.register(metric!(
64 name: "mz_persist_s3_read_timeouts",
65 help: "number of timeouts waiting on first response byte from S3",
66 )),
67 get_part: operations.with_label_values(&["get_part"]),
68 get_invalid_resp: operations.with_label_values(&["get_invalid_resp"]),
69 set_single: operations.with_label_values(&["set_single"]),
70 set_multi_create: operations.with_label_values(&["set_multi_create"]),
71 set_multi_part: operations.with_label_values(&["set_multi_part"]),
72 set_multi_complete: operations.with_label_values(&["set_multi_complete"]),
73 delete_head: operations.with_label_values(&["delete_head"]),
74 delete_object: operations.with_label_values(&["delete_object"]),
75 list_objects: operations.with_label_values(&["list_objects"]),
76 error_counts: errors,
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct ArrowMetrics {
84 pub(crate) key: ArrowColumnMetrics,
85 pub(crate) val: ArrowColumnMetrics,
86 pub(crate) part_build_seconds: Counter,
87 pub(crate) part_build_count: IntCounter,
88 pub(crate) concat_bytes: IntCounter,
89}
90
91impl ArrowMetrics {
92 pub fn new(registry: &MetricsRegistry) -> Self {
94 let op_count: IntCounterVec = registry.register(metric!(
95 name: "mz_persist_columnar_op_count",
96 help: "number of rows we've run the specified op on in our structured columnar format",
97 var_labels: ["op", "column", "result"],
98 ));
99
100 let part_build_seconds: Counter = registry.register(metric!(
101 name: "mz_persist_columnar_part_build_seconds",
102 help: "number of seconds we've spent encoding our structured columnar format",
103 ));
104 let part_build_count: IntCounter = registry.register(metric!(
105 name: "mz_persist_columnar_part_build_count",
106 help: "number of times we've encoded our structured columnar format",
107 ));
108 let concat_bytes: IntCounter = registry.register(metric!(
109 name: "mz_persist_columnar_part_concat_bytes",
110 help: "number of bytes we've copied when concatenating updates",
111 ));
112
113 ArrowMetrics {
114 key: ArrowColumnMetrics::new(&op_count, "key"),
115 val: ArrowColumnMetrics::new(&op_count, "val"),
116 part_build_seconds,
117 part_build_count,
118 concat_bytes,
119 }
120 }
121
122 pub fn key(&self) -> &ArrowColumnMetrics {
124 &self.key
125 }
126
127 pub fn val(&self) -> &ArrowColumnMetrics {
129 &self.val
130 }
131
132 pub fn measure_part_build<R, F: FnOnce() -> R>(&self, f: F) -> R {
134 let start = Instant::now();
135 let r = f();
136 let duration = start.elapsed();
137
138 self.part_build_count.inc();
139 self.part_build_seconds.inc_by(duration.as_secs_f64());
140
141 r
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct ArrowColumnMetrics {
148 correct_count: IntCounter,
149 invalid_count: IntCounter,
150}
151
152impl ArrowColumnMetrics {
153 fn new(count: &IntCounterVec, col: &'static str) -> Self {
154 ArrowColumnMetrics {
155 correct_count: count.with_label_values(&["validation", col, "correct"]),
156 invalid_count: count.with_label_values(&["validation", col, "invalid"]),
157 }
158 }
159
160 pub fn report_valid<F: FnOnce() -> bool>(&self, f: F) -> bool {
162 let is_valid = f();
163 if is_valid {
164 self.correct_count.inc();
165 } else {
166 self.invalid_count.inc();
167 }
168 is_valid
169 }
170}
171
172#[derive(Debug, Clone)]
174pub struct ParquetMetrics {
175 pub(crate) encoded_size: IntCounterVec,
176 pub(crate) num_row_groups: IntCounterVec,
177 pub(crate) k_metrics: ParquetColumnMetrics,
178 pub(crate) v_metrics: ParquetColumnMetrics,
179 pub(crate) t_metrics: ParquetColumnMetrics,
180 pub(crate) d_metrics: ParquetColumnMetrics,
181 pub(crate) k_s_metrics: ParquetColumnMetrics,
182 pub(crate) v_s_metrics: ParquetColumnMetrics,
183 pub(crate) elided_null_buffers: IntCounter,
184}
185
186impl ParquetMetrics {
187 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
188 let encoded_size: IntCounterVec = registry.register(metric!(
189 name: "mz_persist_parquet_encoded_size",
190 help: "encoded size of a parquet file that we write to S3",
191 var_labels: ["format"],
192 ));
193 let num_row_groups: IntCounterVec = registry.register(metric!(
194 name: "mz_persist_parquet_row_group_count",
195 help: "count of row groups in a parquet file",
196 var_labels: ["format"],
197 ));
198
199 let column_size: IntCounterVec = registry.register(metric!(
200 name: "mz_persist_parquet_column_size",
201 help: "size in bytes of a column within a parquet file",
202 var_labels: ["col", "compressed"],
203 ));
204
205 ParquetMetrics {
206 encoded_size,
207 num_row_groups,
208 k_metrics: ParquetColumnMetrics::new("k", &column_size),
209 v_metrics: ParquetColumnMetrics::new("v", &column_size),
210 t_metrics: ParquetColumnMetrics::new("t", &column_size),
211 d_metrics: ParquetColumnMetrics::new("d", &column_size),
212 k_s_metrics: ParquetColumnMetrics::new("k_s", &column_size),
213 v_s_metrics: ParquetColumnMetrics::new("v_s", &column_size),
214 elided_null_buffers: registry.register(metric!(
215 name: "mz_persist_parquet_elided_null_buffer_count",
216 help: "times we dropped an unnecessary null buffer returned by parquet decoding",
217 )),
218 }
219 }
220}
221
222#[derive(Debug, Clone)]
224pub struct ParquetColumnMetrics {
225 pub(crate) uncompressed_size: IntCounter,
226 pub(crate) compressed_size: IntCounter,
227}
228
229impl ParquetColumnMetrics {
230 pub(crate) fn new(col: &'static str, size: &IntCounterVec) -> Self {
231 ParquetColumnMetrics {
232 uncompressed_size: size.with_label_values(&[col, "uncompressed"]),
233 compressed_size: size.with_label_values(&[col, "compressed"]),
234 }
235 }
236
237 pub(crate) fn report_sizes(&self, uncompressed: u64, compressed: u64) {
238 self.uncompressed_size.inc_by(uncompressed);
239 self.compressed_size.inc_by(compressed);
240 }
241}
242
243#[derive(Debug)]
245pub struct ColumnarMetrics {
246 pub(crate) parquet: ParquetMetrics,
247 pub(crate) arrow: ArrowMetrics,
248}
249
250impl ColumnarMetrics {
251 pub fn new(registry: &MetricsRegistry) -> Self {
253 ColumnarMetrics {
254 parquet: ParquetMetrics::new(registry),
255 arrow: ArrowMetrics::new(registry),
256 }
257 }
258
259 pub fn arrow(&self) -> &ArrowMetrics {
261 &self.arrow
262 }
263
264 pub fn parquet(&self) -> &ParquetMetrics {
266 &self.parquet
267 }
268
269 pub fn disconnected() -> Self {
273 Self::new(&MetricsRegistry::new())
274 }
275}