1use std::sync::Arc;
13use std::time::Instant;
14
15use mz_dyncfg::ConfigSet;
16use mz_ore::lgbytes::{LgBytesMetrics, LgBytesOpMetrics};
17use mz_ore::metric;
18use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
19use prometheus::IntCounterVec;
20
21#[derive(Debug, Clone)]
23pub struct S3BlobMetrics {
24 pub(crate) operation_timeouts: IntCounter,
25 pub(crate) operation_attempt_timeouts: IntCounter,
26 pub(crate) connect_timeouts: IntCounter,
27 pub(crate) read_timeouts: IntCounter,
28 pub(crate) get_part: IntCounter,
29 pub(crate) get_invalid_resp: IntCounter,
30 pub(crate) set_single: IntCounter,
31 pub(crate) set_multi_create: IntCounter,
32 pub(crate) set_multi_part: IntCounter,
33 pub(crate) set_multi_complete: IntCounter,
34 pub(crate) delete_head: IntCounter,
35 pub(crate) delete_object: IntCounter,
36 pub(crate) list_objects: IntCounter,
37 pub(crate) error_counts: IntCounterVec,
38
39 pub lgbytes: LgBytesMetrics,
43}
44
45impl S3BlobMetrics {
46 pub fn new(registry: &MetricsRegistry) -> Self {
48 let operations: IntCounterVec = registry.register(metric!(
49 name: "mz_persist_s3_operations",
50 help: "number of raw s3 calls on behalf of Blob interface methods",
51 var_labels: ["op"],
52 ));
53 let errors: IntCounterVec = registry.register(metric!(
54 name: "mz_persist_s3_errors",
55 help: "errors",
56 var_labels: ["op", "code"],
57 ));
58 Self {
59 operation_timeouts: registry.register(metric!(
60 name: "mz_persist_s3_operation_timeouts",
61 help: "number of operation timeouts (including retries)",
62 )),
63 operation_attempt_timeouts: registry.register(metric!(
64 name: "mz_persist_s3_operation_attempt_timeouts",
65 help: "number of operation attempt timeouts (within a single retry)",
66 )),
67 connect_timeouts: registry.register(metric!(
68 name: "mz_persist_s3_connect_timeouts",
69 help: "number of timeouts establishing a connection to S3",
70 )),
71 read_timeouts: registry.register(metric!(
72 name: "mz_persist_s3_read_timeouts",
73 help: "number of timeouts waiting on first response byte from S3",
74 )),
75 get_part: operations.with_label_values(&["get_part"]),
76 get_invalid_resp: operations.with_label_values(&["get_invalid_resp"]),
77 set_single: operations.with_label_values(&["set_single"]),
78 set_multi_create: operations.with_label_values(&["set_multi_create"]),
79 set_multi_part: operations.with_label_values(&["set_multi_part"]),
80 set_multi_complete: operations.with_label_values(&["set_multi_complete"]),
81 delete_head: operations.with_label_values(&["delete_head"]),
82 delete_object: operations.with_label_values(&["delete_object"]),
83 list_objects: operations.with_label_values(&["list_objects"]),
84 error_counts: errors,
85 lgbytes: LgBytesMetrics::new(registry),
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ArrowMetrics {
93 pub(crate) key: ArrowColumnMetrics,
94 pub(crate) val: ArrowColumnMetrics,
95 pub(crate) part_build_seconds: Counter,
96 pub(crate) part_build_count: IntCounter,
97 pub(crate) concat_bytes: IntCounter,
98}
99
100impl ArrowMetrics {
101 pub fn new(registry: &MetricsRegistry) -> Self {
103 let op_count: IntCounterVec = registry.register(metric!(
104 name: "mz_persist_columnar_op_count",
105 help: "number of rows we've run the specified op on in our structured columnar format",
106 var_labels: ["op", "column", "result"],
107 ));
108
109 let part_build_seconds: Counter = registry.register(metric!(
110 name: "mz_persist_columnar_part_build_seconds",
111 help: "number of seconds we've spent encoding our structured columnar format",
112 ));
113 let part_build_count: IntCounter = registry.register(metric!(
114 name: "mz_persist_columnar_part_build_count",
115 help: "number of times we've encoded our structured columnar format",
116 ));
117 let concat_bytes: IntCounter = registry.register(metric!(
118 name: "mz_persist_columnar_part_concat_bytes",
119 help: "number of bytes we've copied when concatenating updates",
120 ));
121
122 ArrowMetrics {
123 key: ArrowColumnMetrics::new(&op_count, "key"),
124 val: ArrowColumnMetrics::new(&op_count, "val"),
125 part_build_seconds,
126 part_build_count,
127 concat_bytes,
128 }
129 }
130
131 pub fn key(&self) -> &ArrowColumnMetrics {
133 &self.key
134 }
135
136 pub fn val(&self) -> &ArrowColumnMetrics {
138 &self.val
139 }
140
141 pub fn measure_part_build<R, F: FnOnce() -> R>(&self, f: F) -> R {
143 let start = Instant::now();
144 let r = f();
145 let duration = start.elapsed();
146
147 self.part_build_count.inc();
148 self.part_build_seconds.inc_by(duration.as_secs_f64());
149
150 r
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct ArrowColumnMetrics {
157 correct_count: IntCounter,
158 invalid_count: IntCounter,
159}
160
161impl ArrowColumnMetrics {
162 fn new(count: &IntCounterVec, col: &'static str) -> Self {
163 ArrowColumnMetrics {
164 correct_count: count.with_label_values(&["validation", col, "correct"]),
165 invalid_count: count.with_label_values(&["validation", col, "invalid"]),
166 }
167 }
168
169 pub fn report_valid<F: FnOnce() -> bool>(&self, f: F) -> bool {
171 let is_valid = f();
172 if is_valid {
173 self.correct_count.inc();
174 } else {
175 self.invalid_count.inc();
176 }
177 is_valid
178 }
179}
180
181#[derive(Debug, Clone)]
183pub struct ParquetMetrics {
184 pub(crate) encoded_size: IntCounterVec,
185 pub(crate) num_row_groups: IntCounterVec,
186 pub(crate) k_metrics: ParquetColumnMetrics,
187 pub(crate) v_metrics: ParquetColumnMetrics,
188 pub(crate) t_metrics: ParquetColumnMetrics,
189 pub(crate) d_metrics: ParquetColumnMetrics,
190 pub(crate) k_s_metrics: ParquetColumnMetrics,
191 pub(crate) v_s_metrics: ParquetColumnMetrics,
192 pub(crate) elided_null_buffers: IntCounter,
193}
194
195impl ParquetMetrics {
196 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
197 let encoded_size: IntCounterVec = registry.register(metric!(
198 name: "mz_persist_parquet_encoded_size",
199 help: "encoded size of a parquet file that we write to S3",
200 var_labels: ["format"],
201 ));
202 let num_row_groups: IntCounterVec = registry.register(metric!(
203 name: "mz_persist_parquet_row_group_count",
204 help: "count of row groups in a parquet file",
205 var_labels: ["format"],
206 ));
207
208 let column_size: IntCounterVec = registry.register(metric!(
209 name: "mz_persist_parquet_column_size",
210 help: "size in bytes of a column within a parquet file",
211 var_labels: ["col", "compressed"],
212 ));
213
214 ParquetMetrics {
215 encoded_size,
216 num_row_groups,
217 k_metrics: ParquetColumnMetrics::new("k", &column_size),
218 v_metrics: ParquetColumnMetrics::new("v", &column_size),
219 t_metrics: ParquetColumnMetrics::new("t", &column_size),
220 d_metrics: ParquetColumnMetrics::new("d", &column_size),
221 k_s_metrics: ParquetColumnMetrics::new("k_s", &column_size),
222 v_s_metrics: ParquetColumnMetrics::new("v_s", &column_size),
223 elided_null_buffers: registry.register(metric!(
224 name: "mz_persist_parquet_elided_null_buffer_count",
225 help: "times we dropped an unnecessary null buffer returned by parquet decoding",
226 )),
227 }
228 }
229}
230
231#[derive(Debug, Clone)]
233pub struct ParquetColumnMetrics {
234 pub(crate) uncompressed_size: IntCounter,
235 pub(crate) compressed_size: IntCounter,
236}
237
238impl ParquetColumnMetrics {
239 pub(crate) fn new(col: &'static str, size: &IntCounterVec) -> Self {
240 ParquetColumnMetrics {
241 uncompressed_size: size.with_label_values(&[col, "uncompressed"]),
242 compressed_size: size.with_label_values(&[col, "compressed"]),
243 }
244 }
245
246 pub(crate) fn report_sizes(&self, uncompressed: u64, compressed: u64) {
247 self.uncompressed_size.inc_by(uncompressed);
248 self.compressed_size.inc_by(compressed);
249 }
250}
251
252#[derive(Debug)]
254pub struct ColumnarMetrics {
255 pub(crate) lgbytes_arrow: LgBytesOpMetrics,
256 pub(crate) parquet: ParquetMetrics,
257 pub(crate) arrow: ArrowMetrics,
258 pub(crate) cfg: Arc<ConfigSet>,
261 pub(crate) is_cc_active: bool,
262}
263
264impl ColumnarMetrics {
265 pub fn new(
267 registry: &MetricsRegistry,
268 lgbytes: &LgBytesMetrics,
269 cfg: Arc<ConfigSet>,
270 is_cc_active: bool,
271 ) -> Self {
272 ColumnarMetrics {
273 parquet: ParquetMetrics::new(registry),
274 arrow: ArrowMetrics::new(registry),
275 lgbytes_arrow: lgbytes.persist_arrow.clone(),
276 cfg,
277 is_cc_active,
278 }
279 }
280
281 pub fn arrow(&self) -> &ArrowMetrics {
283 &self.arrow
284 }
285
286 pub fn parquet(&self) -> &ParquetMetrics {
288 &self.parquet
289 }
290
291 pub fn disconnected() -> Self {
295 let registry = MetricsRegistry::new();
296 let lgbytes = LgBytesMetrics::new(®istry);
297 let cfg = crate::cfg::all_dyn_configs(ConfigSet::default());
298
299 Self::new(®istry, &lgbytes, Arc::new(cfg), false)
300 }
301}