1use std::sync::Arc;
13use std::time::Instant;
14
15use mz_dyncfg::ConfigSet;
16use mz_ore::lgbytes::{LgBytesMetrics, LgBytesOpMetrics};
17use mz_ore::metric;
18use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
19use prometheus::{CounterVec, IntCounterVec};
20
21#[derive(Debug, Clone)]
23pub struct S3BlobMetrics {
24 pub(crate) operation_timeouts: IntCounter,
25 pub(crate) operation_attempt_timeouts: IntCounter,
26 pub(crate) connect_timeouts: IntCounter,
27 pub(crate) read_timeouts: IntCounter,
28 pub(crate) get_part: IntCounter,
29 pub(crate) get_invalid_resp: IntCounter,
30 pub(crate) set_single: IntCounter,
31 pub(crate) set_multi_create: IntCounter,
32 pub(crate) set_multi_part: IntCounter,
33 pub(crate) set_multi_complete: IntCounter,
34 pub(crate) delete_head: IntCounter,
35 pub(crate) delete_object: IntCounter,
36 pub(crate) list_objects: IntCounter,
37 pub(crate) error_counts: IntCounterVec,
38
39 pub lgbytes: LgBytesMetrics,
43}
44
45impl S3BlobMetrics {
46 pub fn new(registry: &MetricsRegistry) -> Self {
48 let operations: IntCounterVec = registry.register(metric!(
49 name: "mz_persist_s3_operations",
50 help: "number of raw s3 calls on behalf of Blob interface methods",
51 var_labels: ["op"],
52 ));
53 let errors: IntCounterVec = registry.register(metric!(
54 name: "mz_persist_s3_errors",
55 help: "errors",
56 var_labels: ["op", "code"],
57 ));
58 Self {
59 operation_timeouts: registry.register(metric!(
60 name: "mz_persist_s3_operation_timeouts",
61 help: "number of operation timeouts (including retries)",
62 )),
63 operation_attempt_timeouts: registry.register(metric!(
64 name: "mz_persist_s3_operation_attempt_timeouts",
65 help: "number of operation attempt timeouts (within a single retry)",
66 )),
67 connect_timeouts: registry.register(metric!(
68 name: "mz_persist_s3_connect_timeouts",
69 help: "number of timeouts establishing a connection to S3",
70 )),
71 read_timeouts: registry.register(metric!(
72 name: "mz_persist_s3_read_timeouts",
73 help: "number of timeouts waiting on first response byte from S3",
74 )),
75 get_part: operations.with_label_values(&["get_part"]),
76 get_invalid_resp: operations.with_label_values(&["get_invalid_resp"]),
77 set_single: operations.with_label_values(&["set_single"]),
78 set_multi_create: operations.with_label_values(&["set_multi_create"]),
79 set_multi_part: operations.with_label_values(&["set_multi_part"]),
80 set_multi_complete: operations.with_label_values(&["set_multi_complete"]),
81 delete_head: operations.with_label_values(&["delete_head"]),
82 delete_object: operations.with_label_values(&["delete_object"]),
83 list_objects: operations.with_label_values(&["list_objects"]),
84 error_counts: errors,
85 lgbytes: LgBytesMetrics::new(registry),
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ArrowMetrics {
93 pub(crate) key: ArrowColumnMetrics,
94 pub(crate) val: ArrowColumnMetrics,
95 pub(crate) part_build_seconds: Counter,
96 pub(crate) part_build_count: IntCounter,
97 pub(crate) concat_bytes: IntCounter,
98}
99
100impl ArrowMetrics {
101 pub fn new(registry: &MetricsRegistry) -> Self {
103 let op_count: IntCounterVec = registry.register(metric!(
104 name: "mz_persist_columnar_op_count",
105 help: "number of rows we've run the specified op on in our structured columnar format",
106 var_labels: ["op", "column", "result"],
107 ));
108 let op_seconds: CounterVec = registry.register(metric!(
109 name: "mz_persist_columnar_op_seconds",
110 help: "numer of seconds we've spent running the specified op on our structured columnar format",
111 var_labels: ["op", "column"],
112 ));
113
114 let part_build_seconds: Counter = registry.register(metric!(
115 name: "mz_persist_columnar_part_build_seconds",
116 help: "number of seconds we've spent encoding our structured columnar format",
117 ));
118 let part_build_count: IntCounter = registry.register(metric!(
119 name: "mz_persist_columnar_part_build_count",
120 help: "number of times we've encoded our structured columnar format",
121 ));
122 let concat_bytes: IntCounter = registry.register(metric!(
123 name: "mz_persist_columnar_part_concat_bytes",
124 help: "number of bytes we've copied when concatenating updates",
125 ));
126
127 ArrowMetrics {
128 key: ArrowColumnMetrics::new(&op_count, &op_seconds, "key"),
129 val: ArrowColumnMetrics::new(&op_count, &op_seconds, "val"),
130 part_build_seconds,
131 part_build_count,
132 concat_bytes,
133 }
134 }
135
136 pub fn key(&self) -> &ArrowColumnMetrics {
138 &self.key
139 }
140
141 pub fn val(&self) -> &ArrowColumnMetrics {
143 &self.val
144 }
145
146 pub fn measure_part_build<R, F: FnOnce() -> R>(&self, f: F) -> R {
148 let start = Instant::now();
149 let r = f();
150 let duration = start.elapsed();
151
152 self.part_build_count.inc();
153 self.part_build_seconds.inc_by(duration.as_secs_f64());
154
155 r
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct ArrowColumnMetrics {
162 decoding_count: IntCounter,
163 decoding_seconds: Counter,
164 correct_count: IntCounter,
165 invalid_count: IntCounter,
166}
167
168impl ArrowColumnMetrics {
169 fn new(count: &IntCounterVec, duration: &CounterVec, col: &'static str) -> Self {
170 ArrowColumnMetrics {
171 decoding_count: count.with_label_values(&["decode", col, "success"]),
172 decoding_seconds: duration.with_label_values(&["decode", col]),
173 correct_count: count.with_label_values(&["validation", col, "correct"]),
174 invalid_count: count.with_label_values(&["validation", col, "invalid"]),
175 }
176 }
177
178 pub fn measure_decoding<R, F: FnOnce() -> R>(&self, decode: F) -> R {
180 let start = Instant::now();
181 let result = decode();
182 let duration = start.elapsed();
183
184 self.decoding_count.inc();
185 self.decoding_seconds.inc_by(duration.as_secs_f64());
186
187 result
188 }
189
190 pub fn report_valid<F: FnOnce() -> bool>(&self, f: F) -> bool {
192 let is_valid = f();
193 if is_valid {
194 self.correct_count.inc();
195 } else {
196 self.invalid_count.inc();
197 }
198 is_valid
199 }
200}
201
202#[derive(Debug, Clone)]
204pub struct ParquetMetrics {
205 pub(crate) encoded_size: IntCounterVec,
206 pub(crate) num_row_groups: IntCounterVec,
207 pub(crate) k_metrics: ParquetColumnMetrics,
208 pub(crate) v_metrics: ParquetColumnMetrics,
209 pub(crate) t_metrics: ParquetColumnMetrics,
210 pub(crate) d_metrics: ParquetColumnMetrics,
211 pub(crate) k_s_metrics: ParquetColumnMetrics,
212 pub(crate) v_s_metrics: ParquetColumnMetrics,
213 pub(crate) elided_null_buffers: IntCounter,
214}
215
216impl ParquetMetrics {
217 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
218 let encoded_size: IntCounterVec = registry.register(metric!(
219 name: "mz_persist_parquet_encoded_size",
220 help: "encoded size of a parquet file that we write to S3",
221 var_labels: ["format"],
222 ));
223 let num_row_groups: IntCounterVec = registry.register(metric!(
224 name: "mz_persist_parquet_row_group_count",
225 help: "count of row groups in a parquet file",
226 var_labels: ["format"],
227 ));
228
229 let column_size: IntCounterVec = registry.register(metric!(
230 name: "mz_persist_parquet_column_size",
231 help: "size in bytes of a column within a parquet file",
232 var_labels: ["col", "compressed"],
233 ));
234
235 ParquetMetrics {
236 encoded_size,
237 num_row_groups,
238 k_metrics: ParquetColumnMetrics::new("k", &column_size),
239 v_metrics: ParquetColumnMetrics::new("v", &column_size),
240 t_metrics: ParquetColumnMetrics::new("t", &column_size),
241 d_metrics: ParquetColumnMetrics::new("d", &column_size),
242 k_s_metrics: ParquetColumnMetrics::new("k_s", &column_size),
243 v_s_metrics: ParquetColumnMetrics::new("v_s", &column_size),
244 elided_null_buffers: registry.register(metric!(
245 name: "mz_persist_parquet_elided_null_buffer_count",
246 help: "times we dropped an unnecessary null buffer returned by parquet decoding",
247 )),
248 }
249 }
250}
251
252#[derive(Debug, Clone)]
254pub struct ParquetColumnMetrics {
255 pub(crate) uncompressed_size: IntCounter,
256 pub(crate) compressed_size: IntCounter,
257}
258
259impl ParquetColumnMetrics {
260 pub(crate) fn new(col: &'static str, size: &IntCounterVec) -> Self {
261 ParquetColumnMetrics {
262 uncompressed_size: size.with_label_values(&[col, "uncompressed"]),
263 compressed_size: size.with_label_values(&[col, "compressed"]),
264 }
265 }
266
267 pub(crate) fn report_sizes(&self, uncompressed: u64, compressed: u64) {
268 self.uncompressed_size.inc_by(uncompressed);
269 self.compressed_size.inc_by(compressed);
270 }
271}
272
273#[derive(Debug)]
275pub struct ColumnarMetrics {
276 pub(crate) lgbytes_arrow: LgBytesOpMetrics,
277 pub(crate) parquet: ParquetMetrics,
278 pub(crate) arrow: ArrowMetrics,
279 pub(crate) cfg: Arc<ConfigSet>,
282 pub(crate) is_cc_active: bool,
283}
284
285impl ColumnarMetrics {
286 pub fn new(
288 registry: &MetricsRegistry,
289 lgbytes: &LgBytesMetrics,
290 cfg: Arc<ConfigSet>,
291 is_cc_active: bool,
292 ) -> Self {
293 ColumnarMetrics {
294 parquet: ParquetMetrics::new(registry),
295 arrow: ArrowMetrics::new(registry),
296 lgbytes_arrow: lgbytes.persist_arrow.clone(),
297 cfg,
298 is_cc_active,
299 }
300 }
301
302 pub fn arrow(&self) -> &ArrowMetrics {
304 &self.arrow
305 }
306
307 pub fn parquet(&self) -> &ParquetMetrics {
309 &self.parquet
310 }
311
312 pub fn disconnected() -> Self {
316 let registry = MetricsRegistry::new();
317 let lgbytes = LgBytesMetrics::new(®istry);
318 let cfg = crate::cfg::all_dyn_configs(ConfigSet::default());
319
320 Self::new(®istry, &lgbytes, Arc::new(cfg), false)
321 }
322}