mz_storage/metrics/
upsert.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for upsert sources.
11
12use std::collections::BTreeMap;
13use std::sync::{Arc, Mutex, Weak};
14
15use mz_ore::metric;
16use mz_ore::metrics::{
17    DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec,
18    IntCounterVec, MetricsRegistry, UIntGaugeVec,
19};
20use mz_ore::stats::histogram_seconds_buckets;
21use mz_repr::GlobalId;
22use mz_rocksdb::RocksDBInstanceMetrics;
23use mz_storage_operators::metrics::BackpressureMetrics;
24use prometheus::core::{AtomicF64, AtomicU64};
25
26/// Metric definitions for the `upsert` operator.
27#[derive(Clone, Debug)]
28pub(crate) struct UpsertMetricDefs {
29    pub(crate) rehydration_latency: GaugeVec,
30    pub(crate) rehydration_total: UIntGaugeVec,
31    pub(crate) rehydration_updates: UIntGaugeVec,
32
33    // Metric will contain either 0 to denote in-memory state usage,
34    // and 1 to denote auto spill to rocksdb
35    pub(crate) rocksdb_autospill_in_use: UIntGaugeVec,
36
37    // These are used by `shared`.
38    pub(crate) merge_snapshot_latency: HistogramVec,
39    pub(crate) merge_snapshot_updates: IntCounterVec,
40    pub(crate) merge_snapshot_inserts: IntCounterVec,
41    pub(crate) merge_snapshot_deletes: IntCounterVec,
42    pub(crate) upsert_inserts: IntCounterVec,
43    pub(crate) upsert_updates: IntCounterVec,
44    pub(crate) upsert_deletes: IntCounterVec,
45    pub(crate) multi_get_latency: HistogramVec,
46    pub(crate) multi_get_size: IntCounterVec,
47    pub(crate) multi_get_result_count: IntCounterVec,
48    pub(crate) multi_get_result_bytes: IntCounterVec,
49    pub(crate) multi_put_latency: HistogramVec,
50    pub(crate) multi_put_size: IntCounterVec,
51
52    // These are used by `rocksdb`.
53    pub(crate) rocksdb_multi_get_latency: HistogramVec,
54    pub(crate) rocksdb_multi_get_size: IntCounterVec,
55    pub(crate) rocksdb_multi_get_result_count: IntCounterVec,
56    pub(crate) rocksdb_multi_get_result_bytes: IntCounterVec,
57    pub(crate) rocksdb_multi_get_count: IntCounterVec,
58    pub(crate) rocksdb_multi_put_count: IntCounterVec,
59    pub(crate) rocksdb_multi_put_latency: HistogramVec,
60    pub(crate) rocksdb_multi_put_size: IntCounterVec,
61    // These are maps so that multiple timely workers can interact with the same
62    // `DeleteOnDropHistogram`, which is only dropped once ALL workers drop it.
63    // The map may contain arbitrary, old `Weak`s for deleted sources, which are
64    // only cleaned if those sources are recreated.
65    //
66    // We don't parameterize these by `worker_id` like the `rehydration_*` ones
67    // to save on time-series cardinality.
68    pub(crate) shared: Arc<Mutex<BTreeMap<GlobalId, Weak<UpsertSharedMetrics>>>>,
69    pub(crate) rocksdb_shared:
70        Arc<Mutex<BTreeMap<GlobalId, Weak<mz_rocksdb::RocksDBSharedMetrics>>>>,
71}
72
73impl UpsertMetricDefs {
74    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
75        let shared = Arc::new(Mutex::new(BTreeMap::new()));
76        let rocksdb_shared = Arc::new(Mutex::new(BTreeMap::new()));
77        Self {
78            rehydration_latency: registry.register(metric!(
79                name: "mz_storage_upsert_state_rehydration_latency",
80                help: "The latency, per-worker, in fractional seconds, \
81                    of rehydrating the upsert state for this source",
82                var_labels: ["source_id", "worker_id"],
83            )),
84            rehydration_total: registry.register(metric!(
85                name: "mz_storage_upsert_state_rehydration_total",
86                help: "The number of values \
87                    per-worker, rehydrated into the upsert state for \
88                    this source",
89                var_labels: ["source_id", "worker_id"],
90            )),
91            rehydration_updates: registry.register(metric!(
92                name: "mz_storage_upsert_state_rehydration_updates",
93                help: "The number of updates (both negative and positive), \
94                    per-worker, rehydrated into the upsert state for \
95                    this source",
96                var_labels: ["source_id", "worker_id"],
97            )),
98            rocksdb_autospill_in_use: registry.register(metric!(
99                name: "mz_storage_upsert_state_rocksdb_autospill_in_use",
100                help: "Flag to denote whether upsert state has spilled to rocksdb \
101                    or using in-memory state",
102                var_labels: ["source_id", "worker_id"],
103            )),
104            // Choose a relatively low number of representative buckets.
105            merge_snapshot_latency: registry.register(metric!(
106                name: "mz_storage_upsert_merge_snapshot_latency",
107                help: "The latencies, in fractional seconds, \
108                    of merging snapshot updates into upsert state for this source. \
109                    Specific implementations of upsert state may have more detailed \
110                    metrics about sub-batches.",
111                var_labels: ["source_id"],
112                buckets: histogram_seconds_buckets(0.000_500, 32.0),
113            )),
114            merge_snapshot_updates: registry.register(metric!(
115                name: "mz_storage_upsert_merge_snapshot_updates_total",
116                help: "The batch size, \
117                    of merging snapshot updates into upsert state for this source. \
118                    Specific implementations of upsert state may have more detailed \
119                    metrics about sub-batches.",
120                var_labels: ["source_id", "worker_id"],
121            )),
122            merge_snapshot_inserts: registry.register(metric!(
123                name: "mz_storage_upsert_merge_snapshot_inserts_total",
124                help: "The number of inserts in a batch for merging snapshot updates \
125                    for this source.",
126                var_labels: ["source_id", "worker_id"],
127            )),
128            merge_snapshot_deletes: registry.register(metric!(
129                name: "mz_storage_upsert_merge_snapshot_deletes_total",
130                help: "The number of deletes in a batch for merging snapshot updates \
131                    for this source.",
132                var_labels: ["source_id", "worker_id"],
133            )),
134            upsert_inserts: registry.register(metric!(
135                name: "mz_storage_upsert_inserts_total",
136                help: "The number of inserts done by the upsert operator",
137                var_labels: ["source_id", "worker_id"],
138            )),
139            upsert_updates: registry.register(metric!(
140                name: "mz_storage_upsert_updates_total",
141                help: "The number of updates done by the upsert operator",
142                var_labels: ["source_id", "worker_id"],
143            )),
144            upsert_deletes: registry.register(metric!(
145                name: "mz_storage_upsert_deletes_total",
146                help: "The number of deletes done by the upsert operator.",
147                var_labels: ["source_id", "worker_id"],
148            )),
149            multi_get_latency: registry.register(metric!(
150                name: "mz_storage_upsert_multi_get_latency",
151                help: "The latencies, in fractional seconds, \
152                    of getting values from the upsert state for this source. \
153                    Specific implementations of upsert state may have more detailed \
154                    metrics about sub-batches.",
155                var_labels: ["source_id"],
156                buckets: histogram_seconds_buckets(0.000_500, 32.0),
157            )),
158            multi_get_size: registry.register(metric!(
159                name: "mz_storage_upsert_multi_get_size_total",
160                help: "The batch size, \
161                    of getting values from the upsert state for this source. \
162                    Specific implementations of upsert state may have more detailed \
163                    metrics about sub-batches.",
164                var_labels: ["source_id", "worker_id"],
165            )),
166            multi_get_result_count: registry.register(metric!(
167                name: "mz_storage_upsert_multi_get_result_count_total",
168                help: "The number of non-empty records returned in a multi_get batch. \
169                    Specific implementations of upsert state may have more detailed \
170                    metrics about sub-batches.",
171                var_labels: ["source_id", "worker_id"],
172            )),
173            multi_get_result_bytes: registry.register(metric!(
174                name: "mz_storage_upsert_multi_get_result_bytes_total",
175                help: "The total size of records returned in a multi_get batch. \
176                    Specific implementations of upsert state may have more detailed \
177                    metrics about sub-batches.",
178                var_labels: ["source_id", "worker_id"],
179            )),
180            multi_put_latency: registry.register(metric!(
181                name: "mz_storage_upsert_multi_put_latency",
182                help: "The latencies, in fractional seconds, \
183                    of getting values into the upsert state for this source. \
184                    Specific implementations of upsert state may have more detailed \
185                    metrics about sub-batches.",
186                var_labels: ["source_id"],
187                buckets: histogram_seconds_buckets(0.000_500, 32.0),
188            )),
189            multi_put_size: registry.register(metric!(
190                name: "mz_storage_upsert_multi_put_size_total",
191                help: "The batch size, \
192                    of getting values into the upsert state for this source. \
193                    Specific implementations of upsert state may have more detailed \
194                    metrics about sub-batches.",
195                var_labels: ["source_id", "worker_id"],
196            )),
197            shared,
198            rocksdb_multi_get_latency: registry.register(metric!(
199                name: "mz_storage_rocksdb_multi_get_latency",
200                help: "The latencies, in fractional seconds, \
201                    of getting batches of values from RocksDB for this source.",
202                var_labels: ["source_id"],
203                buckets: histogram_seconds_buckets(0.000_500, 32.0),
204            )),
205            rocksdb_multi_get_size: registry.register(metric!(
206                name: "mz_storage_rocksdb_multi_get_size_total",
207                help: "The batch size, \
208                    of getting batches of values from RocksDB for this source.",
209                var_labels: ["source_id", "worker_id"],
210            )),
211            rocksdb_multi_get_result_count: registry.register(metric!(
212                name: "mz_storage_rocksdb_multi_get_result_count_total",
213                help: "The number of non-empty records returned, \
214                    when getting batches of values from RocksDB for this source.",
215                var_labels: ["source_id", "worker_id"],
216            )),
217            rocksdb_multi_get_result_bytes: registry.register(metric!(
218                name: "mz_storage_rocksdb_multi_get_result_bytes_total",
219                help: "The total size of records returned, \
220                    when getting batches of values from RocksDB for this source.",
221                var_labels: ["source_id", "worker_id"],
222            )),
223            rocksdb_multi_get_count: registry.register(metric!(
224                name: "mz_storage_rocksdb_multi_get_count_total",
225                help: "The number of calls to rocksdb multi_get.",
226                var_labels: ["source_id", "worker_id"],
227            )),
228            rocksdb_multi_put_count: registry.register(metric!(
229                name: "mz_storage_rocksdb_multi_put_count_total",
230                help: "The number of calls to rocksdb multi_put.",
231                var_labels: ["source_id", "worker_id"],
232            )),
233            rocksdb_multi_put_latency: registry.register(metric!(
234                name: "mz_storage_rocksdb_multi_put_latency",
235                help: "The latencies, in fractional seconds, \
236                    of putting batches of values into RocksDB for this source.",
237                var_labels: ["source_id"],
238                buckets: histogram_seconds_buckets(0.000_500, 32.0),
239            )),
240            rocksdb_multi_put_size: registry.register(metric!(
241                name: "mz_storage_rocksdb_multi_put_size_total",
242                help: "The batch size, \
243                    of putting batches of values into RocksDB for this source.",
244                var_labels: ["source_id", "worker_id"],
245            )),
246            rocksdb_shared,
247        }
248    }
249
250    /// Get a shared-across-workers instance of an `UpsertSharedMetrics`.
251    pub(crate) fn shared(&self, source_id: &GlobalId) -> Arc<UpsertSharedMetrics> {
252        let mut shared = self.shared.lock().expect("mutex poisoned");
253        if let Some(shared_metrics) = shared.get(source_id) {
254            if let Some(shared_metrics) = shared_metrics.upgrade() {
255                return Arc::clone(&shared_metrics);
256            } else {
257                assert!(shared.remove(source_id).is_some());
258            }
259        }
260        let shared_metrics = Arc::new(UpsertSharedMetrics::new(self, *source_id));
261        assert!(
262            shared
263                .insert(source_id.clone(), Arc::downgrade(&shared_metrics))
264                .is_none()
265        );
266        shared_metrics
267    }
268
269    /// Get a shared-across-workers instance of an `RocksDBSharedMetrics`
270    pub(crate) fn rocksdb_shared(
271        &self,
272        source_id: &GlobalId,
273    ) -> Arc<mz_rocksdb::RocksDBSharedMetrics> {
274        let mut rocksdb = self.rocksdb_shared.lock().expect("mutex poisoned");
275        if let Some(shared_metrics) = rocksdb.get(source_id) {
276            if let Some(shared_metrics) = shared_metrics.upgrade() {
277                return Arc::clone(&shared_metrics);
278            } else {
279                assert!(rocksdb.remove(source_id).is_some());
280            }
281        }
282
283        let rocksdb_metrics = {
284            let source_id = source_id.to_string();
285            mz_rocksdb::RocksDBSharedMetrics {
286                multi_get_latency: self
287                    .rocksdb_multi_get_latency
288                    .get_delete_on_drop_metric(vec![source_id.clone()]),
289                multi_put_latency: self
290                    .rocksdb_multi_put_latency
291                    .get_delete_on_drop_metric(vec![source_id.clone()]),
292            }
293        };
294
295        let rocksdb_metrics = Arc::new(rocksdb_metrics);
296        assert!(
297            rocksdb
298                .insert(source_id.clone(), Arc::downgrade(&rocksdb_metrics))
299                .is_none()
300        );
301        rocksdb_metrics
302    }
303}
304
305/// Metrics for upsert source shared across workers.
306#[derive(Debug)]
307pub(crate) struct UpsertSharedMetrics {
308    pub(crate) merge_snapshot_latency: DeleteOnDropHistogram<Vec<String>>,
309    pub(crate) multi_get_latency: DeleteOnDropHistogram<Vec<String>>,
310    pub(crate) multi_put_latency: DeleteOnDropHistogram<Vec<String>>,
311}
312
313impl UpsertSharedMetrics {
314    /// Create an `UpsertSharedMetrics` from the `UpsertMetricDefs`
315    fn new(metrics: &UpsertMetricDefs, source_id: GlobalId) -> Self {
316        let source_id = source_id.to_string();
317        UpsertSharedMetrics {
318            merge_snapshot_latency: metrics
319                .merge_snapshot_latency
320                .get_delete_on_drop_metric(vec![source_id.clone()]),
321            multi_get_latency: metrics
322                .multi_get_latency
323                .get_delete_on_drop_metric(vec![source_id.clone()]),
324            multi_put_latency: metrics
325                .multi_put_latency
326                .get_delete_on_drop_metric(vec![source_id.clone()]),
327        }
328    }
329}
330
331/// Metrics related to backpressure in `UPSERT` dataflows.
332#[derive(Clone, Debug)]
333pub(crate) struct UpsertBackpressureMetricDefs {
334    pub(crate) emitted_bytes: IntCounterVec,
335    pub(crate) last_backpressured_bytes: UIntGaugeVec,
336    pub(crate) retired_bytes: IntCounterVec,
337}
338
339impl UpsertBackpressureMetricDefs {
340    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
341        // We add a `worker_id` label here, even though only 1 worker is ever
342        // active, as this is the simplest way to avoid the non-active
343        // workers from un-registering metrics. This is consistent with how
344        // `persist_sink` metrics work.
345        Self {
346            emitted_bytes: registry.register(metric!(
347                name: "mz_storage_upsert_backpressure_emitted_bytes",
348                help: "A counter with the number of emitted bytes.",
349                var_labels: ["source_id", "worker_id"],
350            )),
351            last_backpressured_bytes: registry.register(metric!(
352                name: "mz_storage_upsert_backpressure_last_backpressured_bytes",
353                help: "The last count of bytes we are waiting to be retired in \
354                    the operator. This cannot be directly compared to \
355                    `retired_bytes`, but CAN indicate that backpressure is happening.",
356                var_labels: ["source_id", "worker_id"],
357            )),
358            retired_bytes: registry.register(metric!(
359                name: "mz_storage_upsert_backpressure_retired_bytes",
360                help:"A counter with the number of bytes retired by downstream processing.",
361                var_labels: ["source_id", "worker_id"],
362            )),
363        }
364    }
365}
366
367/// Metrics for the `upsert` operator.
368pub struct UpsertMetrics {
369    pub(crate) rehydration_latency: DeleteOnDropGauge<AtomicF64, Vec<String>>,
370    pub(crate) rehydration_total: DeleteOnDropGauge<AtomicU64, Vec<String>>,
371    pub(crate) rehydration_updates: DeleteOnDropGauge<AtomicU64, Vec<String>>,
372    pub(crate) rocksdb_autospill_in_use: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
373
374    pub(crate) merge_snapshot_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
375    pub(crate) merge_snapshot_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
376    pub(crate) merge_snapshot_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
377    pub(crate) upsert_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
378    pub(crate) upsert_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
379    pub(crate) upsert_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
380    pub(crate) multi_get_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
381    pub(crate) multi_get_result_bytes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
382    pub(crate) multi_get_result_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
383    pub(crate) multi_put_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
384
385    pub(crate) shared: Arc<UpsertSharedMetrics>,
386    pub(crate) rocksdb_shared: Arc<mz_rocksdb::RocksDBSharedMetrics>,
387    pub(crate) rocksdb_instance_metrics: Arc<mz_rocksdb::RocksDBInstanceMetrics>,
388    // `UpsertMetrics` keeps a reference (through `Arc`'s) to backpressure metrics, so that
389    // they are not dropped when the `persist_source` operator is dropped.
390    _backpressure_metrics: Option<BackpressureMetrics>,
391}
392
393impl UpsertMetrics {
394    /// Create an `UpsertMetrics` from the `UpsertMetricDefs`.
395    pub(crate) fn new(
396        defs: &UpsertMetricDefs,
397        source_id: GlobalId,
398        worker_id: usize,
399        backpressure_metrics: Option<BackpressureMetrics>,
400    ) -> Self {
401        let source_id_s = source_id.to_string();
402        let worker_id = worker_id.to_string();
403        Self {
404            rehydration_latency: defs
405                .rehydration_latency
406                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
407            rehydration_total: defs
408                .rehydration_total
409                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
410            rehydration_updates: defs
411                .rehydration_updates
412                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
413            rocksdb_autospill_in_use: Arc::new(
414                defs.rocksdb_autospill_in_use
415                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
416            ),
417            merge_snapshot_updates: defs
418                .merge_snapshot_updates
419                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
420            merge_snapshot_inserts: defs
421                .merge_snapshot_inserts
422                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
423            merge_snapshot_deletes: defs
424                .merge_snapshot_deletes
425                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
426            upsert_inserts: defs
427                .upsert_inserts
428                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
429            upsert_updates: defs
430                .upsert_updates
431                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
432            upsert_deletes: defs
433                .upsert_deletes
434                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
435            multi_get_size: defs
436                .multi_get_size
437                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
438            multi_get_result_count: defs
439                .multi_get_result_count
440                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
441            multi_get_result_bytes: defs
442                .multi_get_result_bytes
443                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
444            multi_put_size: defs
445                .multi_put_size
446                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
447
448            shared: defs.shared(&source_id),
449            rocksdb_shared: defs.rocksdb_shared(&source_id),
450            rocksdb_instance_metrics: Arc::new(RocksDBInstanceMetrics {
451                multi_get_size: defs
452                    .rocksdb_multi_get_size
453                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
454                multi_get_result_count: defs
455                    .rocksdb_multi_get_result_count
456                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
457                multi_get_result_bytes: defs
458                    .rocksdb_multi_get_result_bytes
459                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
460                multi_get_count: defs
461                    .rocksdb_multi_get_count
462                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
463                multi_put_count: defs
464                    .rocksdb_multi_put_count
465                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
466                multi_put_size: defs
467                    .rocksdb_multi_put_size
468                    .get_delete_on_drop_metric(vec![source_id_s, worker_id]),
469            }),
470            _backpressure_metrics: backpressure_metrics,
471        }
472    }
473}