1use std::collections::BTreeMap;
13use std::sync::{Arc, Mutex, Weak};
14
15use mz_ore::metric;
16use mz_ore::metrics::{
17 DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec,
18 IntCounterVec, MetricsRegistry, UIntGaugeVec,
19};
20use mz_ore::stats::histogram_seconds_buckets;
21use mz_repr::GlobalId;
22use mz_rocksdb::RocksDBInstanceMetrics;
23use mz_storage_operators::metrics::BackpressureMetrics;
24use prometheus::core::{AtomicF64, AtomicU64};
25
26#[derive(Clone, Debug)]
28pub(crate) struct UpsertMetricDefs {
29 pub(crate) rehydration_latency: GaugeVec,
30 pub(crate) rehydration_total: UIntGaugeVec,
31 pub(crate) rehydration_updates: UIntGaugeVec,
32
33 pub(crate) rocksdb_autospill_in_use: UIntGaugeVec,
36
37 pub(crate) merge_snapshot_latency: HistogramVec,
39 pub(crate) merge_snapshot_updates: IntCounterVec,
40 pub(crate) merge_snapshot_inserts: IntCounterVec,
41 pub(crate) merge_snapshot_deletes: IntCounterVec,
42 pub(crate) upsert_inserts: IntCounterVec,
43 pub(crate) upsert_updates: IntCounterVec,
44 pub(crate) upsert_deletes: IntCounterVec,
45 pub(crate) multi_get_latency: HistogramVec,
46 pub(crate) multi_get_size: IntCounterVec,
47 pub(crate) multi_get_result_count: IntCounterVec,
48 pub(crate) multi_get_result_bytes: IntCounterVec,
49 pub(crate) multi_put_latency: HistogramVec,
50 pub(crate) multi_put_size: IntCounterVec,
51
52 pub(crate) rocksdb_multi_get_latency: HistogramVec,
54 pub(crate) rocksdb_multi_get_size: IntCounterVec,
55 pub(crate) rocksdb_multi_get_result_count: IntCounterVec,
56 pub(crate) rocksdb_multi_get_result_bytes: IntCounterVec,
57 pub(crate) rocksdb_multi_get_count: IntCounterVec,
58 pub(crate) rocksdb_multi_put_count: IntCounterVec,
59 pub(crate) rocksdb_multi_put_latency: HistogramVec,
60 pub(crate) rocksdb_multi_put_size: IntCounterVec,
61 pub(crate) shared: Arc<Mutex<BTreeMap<GlobalId, Weak<UpsertSharedMetrics>>>>,
69 pub(crate) rocksdb_shared:
70 Arc<Mutex<BTreeMap<GlobalId, Weak<mz_rocksdb::RocksDBSharedMetrics>>>>,
71}
72
73impl UpsertMetricDefs {
74 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
75 let shared = Arc::new(Mutex::new(BTreeMap::new()));
76 let rocksdb_shared = Arc::new(Mutex::new(BTreeMap::new()));
77 Self {
78 rehydration_latency: registry.register(metric!(
79 name: "mz_storage_upsert_state_rehydration_latency",
80 help: "The latency, per-worker, in fractional seconds, \
81 of rehydrating the upsert state for this source",
82 var_labels: ["source_id", "worker_id"],
83 )),
84 rehydration_total: registry.register(metric!(
85 name: "mz_storage_upsert_state_rehydration_total",
86 help: "The number of values \
87 per-worker, rehydrated into the upsert state for \
88 this source",
89 var_labels: ["source_id", "worker_id"],
90 )),
91 rehydration_updates: registry.register(metric!(
92 name: "mz_storage_upsert_state_rehydration_updates",
93 help: "The number of updates (both negative and positive), \
94 per-worker, rehydrated into the upsert state for \
95 this source",
96 var_labels: ["source_id", "worker_id"],
97 )),
98 rocksdb_autospill_in_use: registry.register(metric!(
99 name: "mz_storage_upsert_state_rocksdb_autospill_in_use",
100 help: "Flag to denote whether upsert state has spilled to rocksdb \
101 or using in-memory state",
102 var_labels: ["source_id", "worker_id"],
103 )),
104 merge_snapshot_latency: registry.register(metric!(
106 name: "mz_storage_upsert_merge_snapshot_latency",
107 help: "The latencies, in fractional seconds, \
108 of merging snapshot updates into upsert state for this source. \
109 Specific implementations of upsert state may have more detailed \
110 metrics about sub-batches.",
111 var_labels: ["source_id"],
112 buckets: histogram_seconds_buckets(0.000_500, 32.0),
113 )),
114 merge_snapshot_updates: registry.register(metric!(
115 name: "mz_storage_upsert_merge_snapshot_updates_total",
116 help: "The batch size, \
117 of merging snapshot updates into upsert state for this source. \
118 Specific implementations of upsert state may have more detailed \
119 metrics about sub-batches.",
120 var_labels: ["source_id", "worker_id"],
121 )),
122 merge_snapshot_inserts: registry.register(metric!(
123 name: "mz_storage_upsert_merge_snapshot_inserts_total",
124 help: "The number of inserts in a batch for merging snapshot updates \
125 for this source.",
126 var_labels: ["source_id", "worker_id"],
127 )),
128 merge_snapshot_deletes: registry.register(metric!(
129 name: "mz_storage_upsert_merge_snapshot_deletes_total",
130 help: "The number of deletes in a batch for merging snapshot updates \
131 for this source.",
132 var_labels: ["source_id", "worker_id"],
133 )),
134 upsert_inserts: registry.register(metric!(
135 name: "mz_storage_upsert_inserts_total",
136 help: "The number of inserts done by the upsert operator",
137 var_labels: ["source_id", "worker_id"],
138 )),
139 upsert_updates: registry.register(metric!(
140 name: "mz_storage_upsert_updates_total",
141 help: "The number of updates done by the upsert operator",
142 var_labels: ["source_id", "worker_id"],
143 )),
144 upsert_deletes: registry.register(metric!(
145 name: "mz_storage_upsert_deletes_total",
146 help: "The number of deletes done by the upsert operator.",
147 var_labels: ["source_id", "worker_id"],
148 )),
149 multi_get_latency: registry.register(metric!(
150 name: "mz_storage_upsert_multi_get_latency",
151 help: "The latencies, in fractional seconds, \
152 of getting values from the upsert state for this source. \
153 Specific implementations of upsert state may have more detailed \
154 metrics about sub-batches.",
155 var_labels: ["source_id"],
156 buckets: histogram_seconds_buckets(0.000_500, 32.0),
157 )),
158 multi_get_size: registry.register(metric!(
159 name: "mz_storage_upsert_multi_get_size_total",
160 help: "The batch size, \
161 of getting values from the upsert state for this source. \
162 Specific implementations of upsert state may have more detailed \
163 metrics about sub-batches.",
164 var_labels: ["source_id", "worker_id"],
165 )),
166 multi_get_result_count: registry.register(metric!(
167 name: "mz_storage_upsert_multi_get_result_count_total",
168 help: "The number of non-empty records returned in a multi_get batch. \
169 Specific implementations of upsert state may have more detailed \
170 metrics about sub-batches.",
171 var_labels: ["source_id", "worker_id"],
172 )),
173 multi_get_result_bytes: registry.register(metric!(
174 name: "mz_storage_upsert_multi_get_result_bytes_total",
175 help: "The total size of records returned in a multi_get batch. \
176 Specific implementations of upsert state may have more detailed \
177 metrics about sub-batches.",
178 var_labels: ["source_id", "worker_id"],
179 )),
180 multi_put_latency: registry.register(metric!(
181 name: "mz_storage_upsert_multi_put_latency",
182 help: "The latencies, in fractional seconds, \
183 of getting values into the upsert state for this source. \
184 Specific implementations of upsert state may have more detailed \
185 metrics about sub-batches.",
186 var_labels: ["source_id"],
187 buckets: histogram_seconds_buckets(0.000_500, 32.0),
188 )),
189 multi_put_size: registry.register(metric!(
190 name: "mz_storage_upsert_multi_put_size_total",
191 help: "The batch size, \
192 of getting values into the upsert state for this source. \
193 Specific implementations of upsert state may have more detailed \
194 metrics about sub-batches.",
195 var_labels: ["source_id", "worker_id"],
196 )),
197 shared,
198 rocksdb_multi_get_latency: registry.register(metric!(
199 name: "mz_storage_rocksdb_multi_get_latency",
200 help: "The latencies, in fractional seconds, \
201 of getting batches of values from RocksDB for this source.",
202 var_labels: ["source_id"],
203 buckets: histogram_seconds_buckets(0.000_500, 32.0),
204 )),
205 rocksdb_multi_get_size: registry.register(metric!(
206 name: "mz_storage_rocksdb_multi_get_size_total",
207 help: "The batch size, \
208 of getting batches of values from RocksDB for this source.",
209 var_labels: ["source_id", "worker_id"],
210 )),
211 rocksdb_multi_get_result_count: registry.register(metric!(
212 name: "mz_storage_rocksdb_multi_get_result_count_total",
213 help: "The number of non-empty records returned, \
214 when getting batches of values from RocksDB for this source.",
215 var_labels: ["source_id", "worker_id"],
216 )),
217 rocksdb_multi_get_result_bytes: registry.register(metric!(
218 name: "mz_storage_rocksdb_multi_get_result_bytes_total",
219 help: "The total size of records returned, \
220 when getting batches of values from RocksDB for this source.",
221 var_labels: ["source_id", "worker_id"],
222 )),
223 rocksdb_multi_get_count: registry.register(metric!(
224 name: "mz_storage_rocksdb_multi_get_count_total",
225 help: "The number of calls to rocksdb multi_get.",
226 var_labels: ["source_id", "worker_id"],
227 )),
228 rocksdb_multi_put_count: registry.register(metric!(
229 name: "mz_storage_rocksdb_multi_put_count_total",
230 help: "The number of calls to rocksdb multi_put.",
231 var_labels: ["source_id", "worker_id"],
232 )),
233 rocksdb_multi_put_latency: registry.register(metric!(
234 name: "mz_storage_rocksdb_multi_put_latency",
235 help: "The latencies, in fractional seconds, \
236 of putting batches of values into RocksDB for this source.",
237 var_labels: ["source_id"],
238 buckets: histogram_seconds_buckets(0.000_500, 32.0),
239 )),
240 rocksdb_multi_put_size: registry.register(metric!(
241 name: "mz_storage_rocksdb_multi_put_size_total",
242 help: "The batch size, \
243 of putting batches of values into RocksDB for this source.",
244 var_labels: ["source_id", "worker_id"],
245 )),
246 rocksdb_shared,
247 }
248 }
249
250 pub(crate) fn shared(&self, source_id: &GlobalId) -> Arc<UpsertSharedMetrics> {
252 let mut shared = self.shared.lock().expect("mutex poisoned");
253 if let Some(shared_metrics) = shared.get(source_id) {
254 if let Some(shared_metrics) = shared_metrics.upgrade() {
255 return Arc::clone(&shared_metrics);
256 } else {
257 assert!(shared.remove(source_id).is_some());
258 }
259 }
260 let shared_metrics = Arc::new(UpsertSharedMetrics::new(self, *source_id));
261 assert!(
262 shared
263 .insert(source_id.clone(), Arc::downgrade(&shared_metrics))
264 .is_none()
265 );
266 shared_metrics
267 }
268
269 pub(crate) fn rocksdb_shared(
271 &self,
272 source_id: &GlobalId,
273 ) -> Arc<mz_rocksdb::RocksDBSharedMetrics> {
274 let mut rocksdb = self.rocksdb_shared.lock().expect("mutex poisoned");
275 if let Some(shared_metrics) = rocksdb.get(source_id) {
276 if let Some(shared_metrics) = shared_metrics.upgrade() {
277 return Arc::clone(&shared_metrics);
278 } else {
279 assert!(rocksdb.remove(source_id).is_some());
280 }
281 }
282
283 let rocksdb_metrics = {
284 let source_id = source_id.to_string();
285 mz_rocksdb::RocksDBSharedMetrics {
286 multi_get_latency: self
287 .rocksdb_multi_get_latency
288 .get_delete_on_drop_metric(vec![source_id.clone()]),
289 multi_put_latency: self
290 .rocksdb_multi_put_latency
291 .get_delete_on_drop_metric(vec![source_id.clone()]),
292 }
293 };
294
295 let rocksdb_metrics = Arc::new(rocksdb_metrics);
296 assert!(
297 rocksdb
298 .insert(source_id.clone(), Arc::downgrade(&rocksdb_metrics))
299 .is_none()
300 );
301 rocksdb_metrics
302 }
303}
304
305#[derive(Debug)]
307pub(crate) struct UpsertSharedMetrics {
308 pub(crate) merge_snapshot_latency: DeleteOnDropHistogram<Vec<String>>,
309 pub(crate) multi_get_latency: DeleteOnDropHistogram<Vec<String>>,
310 pub(crate) multi_put_latency: DeleteOnDropHistogram<Vec<String>>,
311}
312
313impl UpsertSharedMetrics {
314 fn new(metrics: &UpsertMetricDefs, source_id: GlobalId) -> Self {
316 let source_id = source_id.to_string();
317 UpsertSharedMetrics {
318 merge_snapshot_latency: metrics
319 .merge_snapshot_latency
320 .get_delete_on_drop_metric(vec![source_id.clone()]),
321 multi_get_latency: metrics
322 .multi_get_latency
323 .get_delete_on_drop_metric(vec![source_id.clone()]),
324 multi_put_latency: metrics
325 .multi_put_latency
326 .get_delete_on_drop_metric(vec![source_id.clone()]),
327 }
328 }
329}
330
331#[derive(Clone, Debug)]
333pub(crate) struct UpsertBackpressureMetricDefs {
334 pub(crate) emitted_bytes: IntCounterVec,
335 pub(crate) last_backpressured_bytes: UIntGaugeVec,
336 pub(crate) retired_bytes: IntCounterVec,
337}
338
339impl UpsertBackpressureMetricDefs {
340 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
341 Self {
346 emitted_bytes: registry.register(metric!(
347 name: "mz_storage_upsert_backpressure_emitted_bytes",
348 help: "A counter with the number of emitted bytes.",
349 var_labels: ["source_id", "worker_id"],
350 )),
351 last_backpressured_bytes: registry.register(metric!(
352 name: "mz_storage_upsert_backpressure_last_backpressured_bytes",
353 help: "The last count of bytes we are waiting to be retired in \
354 the operator. This cannot be directly compared to \
355 `retired_bytes`, but CAN indicate that backpressure is happening.",
356 var_labels: ["source_id", "worker_id"],
357 )),
358 retired_bytes: registry.register(metric!(
359 name: "mz_storage_upsert_backpressure_retired_bytes",
360 help:"A counter with the number of bytes retired by downstream processing.",
361 var_labels: ["source_id", "worker_id"],
362 )),
363 }
364 }
365}
366
367pub struct UpsertMetrics {
369 pub(crate) rehydration_latency: DeleteOnDropGauge<AtomicF64, Vec<String>>,
370 pub(crate) rehydration_total: DeleteOnDropGauge<AtomicU64, Vec<String>>,
371 pub(crate) rehydration_updates: DeleteOnDropGauge<AtomicU64, Vec<String>>,
372 pub(crate) rocksdb_autospill_in_use: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
373
374 pub(crate) merge_snapshot_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
375 pub(crate) merge_snapshot_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
376 pub(crate) merge_snapshot_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
377 pub(crate) upsert_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
378 pub(crate) upsert_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
379 pub(crate) upsert_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
380 pub(crate) multi_get_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
381 pub(crate) multi_get_result_bytes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
382 pub(crate) multi_get_result_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
383 pub(crate) multi_put_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
384
385 pub(crate) shared: Arc<UpsertSharedMetrics>,
386 pub(crate) rocksdb_shared: Arc<mz_rocksdb::RocksDBSharedMetrics>,
387 pub(crate) rocksdb_instance_metrics: Arc<mz_rocksdb::RocksDBInstanceMetrics>,
388 _backpressure_metrics: Option<BackpressureMetrics>,
391}
392
393impl UpsertMetrics {
394 pub(crate) fn new(
396 defs: &UpsertMetricDefs,
397 source_id: GlobalId,
398 worker_id: usize,
399 backpressure_metrics: Option<BackpressureMetrics>,
400 ) -> Self {
401 let source_id_s = source_id.to_string();
402 let worker_id = worker_id.to_string();
403 Self {
404 rehydration_latency: defs
405 .rehydration_latency
406 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
407 rehydration_total: defs
408 .rehydration_total
409 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
410 rehydration_updates: defs
411 .rehydration_updates
412 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
413 rocksdb_autospill_in_use: Arc::new(
414 defs.rocksdb_autospill_in_use
415 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
416 ),
417 merge_snapshot_updates: defs
418 .merge_snapshot_updates
419 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
420 merge_snapshot_inserts: defs
421 .merge_snapshot_inserts
422 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
423 merge_snapshot_deletes: defs
424 .merge_snapshot_deletes
425 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
426 upsert_inserts: defs
427 .upsert_inserts
428 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
429 upsert_updates: defs
430 .upsert_updates
431 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
432 upsert_deletes: defs
433 .upsert_deletes
434 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
435 multi_get_size: defs
436 .multi_get_size
437 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
438 multi_get_result_count: defs
439 .multi_get_result_count
440 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
441 multi_get_result_bytes: defs
442 .multi_get_result_bytes
443 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
444 multi_put_size: defs
445 .multi_put_size
446 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
447
448 shared: defs.shared(&source_id),
449 rocksdb_shared: defs.rocksdb_shared(&source_id),
450 rocksdb_instance_metrics: Arc::new(RocksDBInstanceMetrics {
451 multi_get_size: defs
452 .rocksdb_multi_get_size
453 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
454 multi_get_result_count: defs
455 .rocksdb_multi_get_result_count
456 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
457 multi_get_result_bytes: defs
458 .rocksdb_multi_get_result_bytes
459 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
460 multi_get_count: defs
461 .rocksdb_multi_get_count
462 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
463 multi_put_count: defs
464 .rocksdb_multi_put_count
465 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
466 multi_put_size: defs
467 .rocksdb_multi_put_size
468 .get_delete_on_drop_metric(vec![source_id_s, worker_id]),
469 }),
470 _backpressure_metrics: backpressure_metrics,
471 }
472 }
473}