1use std::collections::BTreeMap;
13use std::sync::{Arc, Mutex, Weak};
14
15use mz_ore::metric;
16use mz_ore::metrics::{
17 DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec,
18 IntCounterVec, MetricsRegistry, UIntGaugeVec,
19};
20use mz_ore::stats::histogram_seconds_buckets;
21use mz_repr::GlobalId;
22use mz_rocksdb::RocksDBInstanceMetrics;
23use mz_storage_operators::metrics::BackpressureMetrics;
24use prometheus::core::{AtomicF64, AtomicU64};
25
26#[derive(Clone, Debug)]
28pub(crate) struct UpsertMetricDefs {
29 pub(crate) rehydration_latency: GaugeVec,
30 pub(crate) rehydration_total: UIntGaugeVec,
31 pub(crate) rehydration_updates: UIntGaugeVec,
32
33 pub(crate) merge_snapshot_latency: HistogramVec,
35 pub(crate) merge_snapshot_updates: IntCounterVec,
36 pub(crate) merge_snapshot_inserts: IntCounterVec,
37 pub(crate) merge_snapshot_deletes: IntCounterVec,
38 pub(crate) upsert_inserts: IntCounterVec,
39 pub(crate) upsert_updates: IntCounterVec,
40 pub(crate) upsert_deletes: IntCounterVec,
41 pub(crate) multi_get_latency: HistogramVec,
42 pub(crate) multi_get_size: IntCounterVec,
43 pub(crate) multi_get_result_count: IntCounterVec,
44 pub(crate) multi_get_result_bytes: IntCounterVec,
45 pub(crate) multi_put_latency: HistogramVec,
46 pub(crate) multi_put_size: IntCounterVec,
47
48 pub(crate) rocksdb_multi_get_latency: HistogramVec,
50 pub(crate) rocksdb_multi_get_size: IntCounterVec,
51 pub(crate) rocksdb_multi_get_result_count: IntCounterVec,
52 pub(crate) rocksdb_multi_get_result_bytes: IntCounterVec,
53 pub(crate) rocksdb_multi_get_count: IntCounterVec,
54 pub(crate) rocksdb_multi_put_count: IntCounterVec,
55 pub(crate) rocksdb_multi_put_latency: HistogramVec,
56 pub(crate) rocksdb_multi_put_size: IntCounterVec,
57 pub(crate) shared: Arc<Mutex<BTreeMap<GlobalId, Weak<UpsertSharedMetrics>>>>,
65 pub(crate) rocksdb_shared:
66 Arc<Mutex<BTreeMap<GlobalId, Weak<mz_rocksdb::RocksDBSharedMetrics>>>>,
67}
68
69impl UpsertMetricDefs {
70 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71 let shared = Arc::new(Mutex::new(BTreeMap::new()));
72 let rocksdb_shared = Arc::new(Mutex::new(BTreeMap::new()));
73 Self {
74 rehydration_latency: registry.register(metric!(
75 name: "mz_storage_upsert_state_rehydration_latency",
76 help: "The latency, per-worker, in fractional seconds, \
77 of rehydrating the upsert state for this source",
78 var_labels: ["source_id", "worker_id"],
79 )),
80 rehydration_total: registry.register(metric!(
81 name: "mz_storage_upsert_state_rehydration_total",
82 help: "The number of values \
83 per-worker, rehydrated into the upsert state for \
84 this source",
85 var_labels: ["source_id", "worker_id"],
86 )),
87 rehydration_updates: registry.register(metric!(
88 name: "mz_storage_upsert_state_rehydration_updates",
89 help: "The number of updates (both negative and positive), \
90 per-worker, rehydrated into the upsert state for \
91 this source",
92 var_labels: ["source_id", "worker_id"],
93 )),
94 merge_snapshot_latency: registry.register(metric!(
96 name: "mz_storage_upsert_merge_snapshot_latency",
97 help: "The latencies, in fractional seconds, \
98 of merging snapshot updates into upsert state for this source. \
99 Specific implementations of upsert state may have more detailed \
100 metrics about sub-batches.",
101 var_labels: ["source_id"],
102 buckets: histogram_seconds_buckets(0.000_500, 32.0),
103 )),
104 merge_snapshot_updates: registry.register(metric!(
105 name: "mz_storage_upsert_merge_snapshot_updates_total",
106 help: "The batch size, \
107 of merging snapshot updates into upsert state for this source. \
108 Specific implementations of upsert state may have more detailed \
109 metrics about sub-batches.",
110 var_labels: ["source_id", "worker_id"],
111 )),
112 merge_snapshot_inserts: registry.register(metric!(
113 name: "mz_storage_upsert_merge_snapshot_inserts_total",
114 help: "The number of inserts in a batch for merging snapshot updates \
115 for this source.",
116 var_labels: ["source_id", "worker_id"],
117 )),
118 merge_snapshot_deletes: registry.register(metric!(
119 name: "mz_storage_upsert_merge_snapshot_deletes_total",
120 help: "The number of deletes in a batch for merging snapshot updates \
121 for this source.",
122 var_labels: ["source_id", "worker_id"],
123 )),
124 upsert_inserts: registry.register(metric!(
125 name: "mz_storage_upsert_inserts_total",
126 help: "The number of inserts done by the upsert operator",
127 var_labels: ["source_id", "worker_id"],
128 )),
129 upsert_updates: registry.register(metric!(
130 name: "mz_storage_upsert_updates_total",
131 help: "The number of updates done by the upsert operator",
132 var_labels: ["source_id", "worker_id"],
133 )),
134 upsert_deletes: registry.register(metric!(
135 name: "mz_storage_upsert_deletes_total",
136 help: "The number of deletes done by the upsert operator.",
137 var_labels: ["source_id", "worker_id"],
138 )),
139 multi_get_latency: registry.register(metric!(
140 name: "mz_storage_upsert_multi_get_latency",
141 help: "The latencies, in fractional seconds, \
142 of getting values from the upsert state for this source. \
143 Specific implementations of upsert state may have more detailed \
144 metrics about sub-batches.",
145 var_labels: ["source_id"],
146 buckets: histogram_seconds_buckets(0.000_500, 32.0),
147 )),
148 multi_get_size: registry.register(metric!(
149 name: "mz_storage_upsert_multi_get_size_total",
150 help: "The batch size, \
151 of getting values from the upsert state for this source. \
152 Specific implementations of upsert state may have more detailed \
153 metrics about sub-batches.",
154 var_labels: ["source_id", "worker_id"],
155 )),
156 multi_get_result_count: registry.register(metric!(
157 name: "mz_storage_upsert_multi_get_result_count_total",
158 help: "The number of non-empty records returned in a multi_get batch. \
159 Specific implementations of upsert state may have more detailed \
160 metrics about sub-batches.",
161 var_labels: ["source_id", "worker_id"],
162 )),
163 multi_get_result_bytes: registry.register(metric!(
164 name: "mz_storage_upsert_multi_get_result_bytes_total",
165 help: "The total size of records returned in a multi_get batch. \
166 Specific implementations of upsert state may have more detailed \
167 metrics about sub-batches.",
168 var_labels: ["source_id", "worker_id"],
169 )),
170 multi_put_latency: registry.register(metric!(
171 name: "mz_storage_upsert_multi_put_latency",
172 help: "The latencies, in fractional seconds, \
173 of getting values into the upsert state for this source. \
174 Specific implementations of upsert state may have more detailed \
175 metrics about sub-batches.",
176 var_labels: ["source_id"],
177 buckets: histogram_seconds_buckets(0.000_500, 32.0),
178 )),
179 multi_put_size: registry.register(metric!(
180 name: "mz_storage_upsert_multi_put_size_total",
181 help: "The batch size, \
182 of getting values into the upsert state for this source. \
183 Specific implementations of upsert state may have more detailed \
184 metrics about sub-batches.",
185 var_labels: ["source_id", "worker_id"],
186 )),
187 shared,
188 rocksdb_multi_get_latency: registry.register(metric!(
189 name: "mz_storage_rocksdb_multi_get_latency",
190 help: "The latencies, in fractional seconds, \
191 of getting batches of values from RocksDB for this source.",
192 var_labels: ["source_id"],
193 buckets: histogram_seconds_buckets(0.000_500, 32.0),
194 )),
195 rocksdb_multi_get_size: registry.register(metric!(
196 name: "mz_storage_rocksdb_multi_get_size_total",
197 help: "The batch size, \
198 of getting batches of values from RocksDB for this source.",
199 var_labels: ["source_id", "worker_id"],
200 )),
201 rocksdb_multi_get_result_count: registry.register(metric!(
202 name: "mz_storage_rocksdb_multi_get_result_count_total",
203 help: "The number of non-empty records returned, \
204 when getting batches of values from RocksDB for this source.",
205 var_labels: ["source_id", "worker_id"],
206 )),
207 rocksdb_multi_get_result_bytes: registry.register(metric!(
208 name: "mz_storage_rocksdb_multi_get_result_bytes_total",
209 help: "The total size of records returned, \
210 when getting batches of values from RocksDB for this source.",
211 var_labels: ["source_id", "worker_id"],
212 )),
213 rocksdb_multi_get_count: registry.register(metric!(
214 name: "mz_storage_rocksdb_multi_get_count_total",
215 help: "The number of calls to rocksdb multi_get.",
216 var_labels: ["source_id", "worker_id"],
217 )),
218 rocksdb_multi_put_count: registry.register(metric!(
219 name: "mz_storage_rocksdb_multi_put_count_total",
220 help: "The number of calls to rocksdb multi_put.",
221 var_labels: ["source_id", "worker_id"],
222 )),
223 rocksdb_multi_put_latency: registry.register(metric!(
224 name: "mz_storage_rocksdb_multi_put_latency",
225 help: "The latencies, in fractional seconds, \
226 of putting batches of values into RocksDB for this source.",
227 var_labels: ["source_id"],
228 buckets: histogram_seconds_buckets(0.000_500, 32.0),
229 )),
230 rocksdb_multi_put_size: registry.register(metric!(
231 name: "mz_storage_rocksdb_multi_put_size_total",
232 help: "The batch size, \
233 of putting batches of values into RocksDB for this source.",
234 var_labels: ["source_id", "worker_id"],
235 )),
236 rocksdb_shared,
237 }
238 }
239
240 pub(crate) fn shared(&self, source_id: &GlobalId) -> Arc<UpsertSharedMetrics> {
242 let mut shared = self.shared.lock().expect("mutex poisoned");
243 if let Some(shared_metrics) = shared.get(source_id) {
244 if let Some(shared_metrics) = shared_metrics.upgrade() {
245 return Arc::clone(&shared_metrics);
246 } else {
247 assert!(shared.remove(source_id).is_some());
248 }
249 }
250 let shared_metrics = Arc::new(UpsertSharedMetrics::new(self, *source_id));
251 assert!(
252 shared
253 .insert(source_id.clone(), Arc::downgrade(&shared_metrics))
254 .is_none()
255 );
256 shared_metrics
257 }
258
259 pub(crate) fn rocksdb_shared(
261 &self,
262 source_id: &GlobalId,
263 ) -> Arc<mz_rocksdb::RocksDBSharedMetrics> {
264 let mut rocksdb = self.rocksdb_shared.lock().expect("mutex poisoned");
265 if let Some(shared_metrics) = rocksdb.get(source_id) {
266 if let Some(shared_metrics) = shared_metrics.upgrade() {
267 return Arc::clone(&shared_metrics);
268 } else {
269 assert!(rocksdb.remove(source_id).is_some());
270 }
271 }
272
273 let rocksdb_metrics = {
274 let source_id = source_id.to_string();
275 mz_rocksdb::RocksDBSharedMetrics {
276 multi_get_latency: self
277 .rocksdb_multi_get_latency
278 .get_delete_on_drop_metric(vec![source_id.clone()]),
279 multi_put_latency: self
280 .rocksdb_multi_put_latency
281 .get_delete_on_drop_metric(vec![source_id.clone()]),
282 }
283 };
284
285 let rocksdb_metrics = Arc::new(rocksdb_metrics);
286 assert!(
287 rocksdb
288 .insert(source_id.clone(), Arc::downgrade(&rocksdb_metrics))
289 .is_none()
290 );
291 rocksdb_metrics
292 }
293}
294
295#[derive(Debug)]
297pub(crate) struct UpsertSharedMetrics {
298 pub(crate) merge_snapshot_latency: DeleteOnDropHistogram<Vec<String>>,
299 pub(crate) multi_get_latency: DeleteOnDropHistogram<Vec<String>>,
300 pub(crate) multi_put_latency: DeleteOnDropHistogram<Vec<String>>,
301}
302
303impl UpsertSharedMetrics {
304 fn new(metrics: &UpsertMetricDefs, source_id: GlobalId) -> Self {
306 let source_id = source_id.to_string();
307 UpsertSharedMetrics {
308 merge_snapshot_latency: metrics
309 .merge_snapshot_latency
310 .get_delete_on_drop_metric(vec![source_id.clone()]),
311 multi_get_latency: metrics
312 .multi_get_latency
313 .get_delete_on_drop_metric(vec![source_id.clone()]),
314 multi_put_latency: metrics
315 .multi_put_latency
316 .get_delete_on_drop_metric(vec![source_id.clone()]),
317 }
318 }
319}
320
321#[derive(Clone, Debug)]
323pub(crate) struct UpsertBackpressureMetricDefs {
324 pub(crate) emitted_bytes: IntCounterVec,
325 pub(crate) last_backpressured_bytes: UIntGaugeVec,
326 pub(crate) retired_bytes: IntCounterVec,
327}
328
329impl UpsertBackpressureMetricDefs {
330 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
331 Self {
336 emitted_bytes: registry.register(metric!(
337 name: "mz_storage_upsert_backpressure_emitted_bytes",
338 help: "A counter with the number of emitted bytes.",
339 var_labels: ["source_id", "worker_id"],
340 )),
341 last_backpressured_bytes: registry.register(metric!(
342 name: "mz_storage_upsert_backpressure_last_backpressured_bytes",
343 help: "The last count of bytes we are waiting to be retired in \
344 the operator. This cannot be directly compared to \
345 `retired_bytes`, but CAN indicate that backpressure is happening.",
346 var_labels: ["source_id", "worker_id"],
347 )),
348 retired_bytes: registry.register(metric!(
349 name: "mz_storage_upsert_backpressure_retired_bytes",
350 help:"A counter with the number of bytes retired by downstream processing.",
351 var_labels: ["source_id", "worker_id"],
352 )),
353 }
354 }
355}
356
357pub struct UpsertMetrics {
359 pub(crate) rehydration_latency: DeleteOnDropGauge<AtomicF64, Vec<String>>,
360 pub(crate) rehydration_total: DeleteOnDropGauge<AtomicU64, Vec<String>>,
361 pub(crate) rehydration_updates: DeleteOnDropGauge<AtomicU64, Vec<String>>,
362
363 pub(crate) merge_snapshot_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
364 pub(crate) merge_snapshot_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
365 pub(crate) merge_snapshot_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
366 pub(crate) upsert_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
367 pub(crate) upsert_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
368 pub(crate) upsert_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
369 pub(crate) multi_get_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
370 pub(crate) multi_get_result_bytes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
371 pub(crate) multi_get_result_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
372 pub(crate) multi_put_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
373
374 pub(crate) shared: Arc<UpsertSharedMetrics>,
375 pub(crate) rocksdb_shared: Arc<mz_rocksdb::RocksDBSharedMetrics>,
376 pub(crate) rocksdb_instance_metrics: Arc<mz_rocksdb::RocksDBInstanceMetrics>,
377 _backpressure_metrics: Option<BackpressureMetrics>,
380}
381
382impl UpsertMetrics {
383 pub(crate) fn new(
385 defs: &UpsertMetricDefs,
386 source_id: GlobalId,
387 worker_id: usize,
388 backpressure_metrics: Option<BackpressureMetrics>,
389 ) -> Self {
390 let source_id_s = source_id.to_string();
391 let worker_id = worker_id.to_string();
392 Self {
393 rehydration_latency: defs
394 .rehydration_latency
395 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
396 rehydration_total: defs
397 .rehydration_total
398 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
399 rehydration_updates: defs
400 .rehydration_updates
401 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
402 merge_snapshot_updates: defs
403 .merge_snapshot_updates
404 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
405 merge_snapshot_inserts: defs
406 .merge_snapshot_inserts
407 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
408 merge_snapshot_deletes: defs
409 .merge_snapshot_deletes
410 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
411 upsert_inserts: defs
412 .upsert_inserts
413 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
414 upsert_updates: defs
415 .upsert_updates
416 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
417 upsert_deletes: defs
418 .upsert_deletes
419 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
420 multi_get_size: defs
421 .multi_get_size
422 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
423 multi_get_result_count: defs
424 .multi_get_result_count
425 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
426 multi_get_result_bytes: defs
427 .multi_get_result_bytes
428 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
429 multi_put_size: defs
430 .multi_put_size
431 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
432
433 shared: defs.shared(&source_id),
434 rocksdb_shared: defs.rocksdb_shared(&source_id),
435 rocksdb_instance_metrics: Arc::new(RocksDBInstanceMetrics {
436 multi_get_size: defs
437 .rocksdb_multi_get_size
438 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
439 multi_get_result_count: defs
440 .rocksdb_multi_get_result_count
441 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
442 multi_get_result_bytes: defs
443 .rocksdb_multi_get_result_bytes
444 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
445 multi_get_count: defs
446 .rocksdb_multi_get_count
447 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
448 multi_put_count: defs
449 .rocksdb_multi_put_count
450 .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
451 multi_put_size: defs
452 .rocksdb_multi_put_size
453 .get_delete_on_drop_metric(vec![source_id_s, worker_id]),
454 }),
455 _backpressure_metrics: backpressure_metrics,
456 }
457 }
458}