mz_storage/metrics/
upsert.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for upsert sources.
11
12use std::collections::BTreeMap;
13use std::sync::{Arc, Mutex, Weak};
14
15use mz_ore::metric;
16use mz_ore::metrics::{
17    DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec,
18    IntCounterVec, MetricsRegistry, UIntGaugeVec,
19};
20use mz_ore::stats::histogram_seconds_buckets;
21use mz_repr::GlobalId;
22use mz_rocksdb::RocksDBInstanceMetrics;
23use mz_storage_operators::metrics::BackpressureMetrics;
24use prometheus::core::{AtomicF64, AtomicU64};
25
26/// Metric definitions for the `upsert` operator.
27#[derive(Clone, Debug)]
28pub(crate) struct UpsertMetricDefs {
29    pub(crate) rehydration_latency: GaugeVec,
30    pub(crate) rehydration_total: UIntGaugeVec,
31    pub(crate) rehydration_updates: UIntGaugeVec,
32
33    // These are used by `shared`.
34    pub(crate) merge_snapshot_latency: HistogramVec,
35    pub(crate) merge_snapshot_updates: IntCounterVec,
36    pub(crate) merge_snapshot_inserts: IntCounterVec,
37    pub(crate) merge_snapshot_deletes: IntCounterVec,
38    pub(crate) upsert_inserts: IntCounterVec,
39    pub(crate) upsert_updates: IntCounterVec,
40    pub(crate) upsert_deletes: IntCounterVec,
41    pub(crate) multi_get_latency: HistogramVec,
42    pub(crate) multi_get_size: IntCounterVec,
43    pub(crate) multi_get_result_count: IntCounterVec,
44    pub(crate) multi_get_result_bytes: IntCounterVec,
45    pub(crate) multi_put_latency: HistogramVec,
46    pub(crate) multi_put_size: IntCounterVec,
47
48    // These are used by `rocksdb`.
49    pub(crate) rocksdb_multi_get_latency: HistogramVec,
50    pub(crate) rocksdb_multi_get_size: IntCounterVec,
51    pub(crate) rocksdb_multi_get_result_count: IntCounterVec,
52    pub(crate) rocksdb_multi_get_result_bytes: IntCounterVec,
53    pub(crate) rocksdb_multi_get_count: IntCounterVec,
54    pub(crate) rocksdb_multi_put_count: IntCounterVec,
55    pub(crate) rocksdb_multi_put_latency: HistogramVec,
56    pub(crate) rocksdb_multi_put_size: IntCounterVec,
57    // These are maps so that multiple timely workers can interact with the same
58    // `DeleteOnDropHistogram`, which is only dropped once ALL workers drop it.
59    // The map may contain arbitrary, old `Weak`s for deleted sources, which are
60    // only cleaned if those sources are recreated.
61    //
62    // We don't parameterize these by `worker_id` like the `rehydration_*` ones
63    // to save on time-series cardinality.
64    pub(crate) shared: Arc<Mutex<BTreeMap<GlobalId, Weak<UpsertSharedMetrics>>>>,
65    pub(crate) rocksdb_shared:
66        Arc<Mutex<BTreeMap<GlobalId, Weak<mz_rocksdb::RocksDBSharedMetrics>>>>,
67}
68
69impl UpsertMetricDefs {
70    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71        let shared = Arc::new(Mutex::new(BTreeMap::new()));
72        let rocksdb_shared = Arc::new(Mutex::new(BTreeMap::new()));
73        Self {
74            rehydration_latency: registry.register(metric!(
75                name: "mz_storage_upsert_state_rehydration_latency",
76                help: "The latency, per-worker, in fractional seconds, \
77                    of rehydrating the upsert state for this source",
78                var_labels: ["source_id", "worker_id"],
79            )),
80            rehydration_total: registry.register(metric!(
81                name: "mz_storage_upsert_state_rehydration_total",
82                help: "The number of values \
83                    per-worker, rehydrated into the upsert state for \
84                    this source",
85                var_labels: ["source_id", "worker_id"],
86            )),
87            rehydration_updates: registry.register(metric!(
88                name: "mz_storage_upsert_state_rehydration_updates",
89                help: "The number of updates (both negative and positive), \
90                    per-worker, rehydrated into the upsert state for \
91                    this source",
92                var_labels: ["source_id", "worker_id"],
93            )),
94            // Choose a relatively low number of representative buckets.
95            merge_snapshot_latency: registry.register(metric!(
96                name: "mz_storage_upsert_merge_snapshot_latency",
97                help: "The latencies, in fractional seconds, \
98                    of merging snapshot updates into upsert state for this source. \
99                    Specific implementations of upsert state may have more detailed \
100                    metrics about sub-batches.",
101                var_labels: ["source_id"],
102                buckets: histogram_seconds_buckets(0.000_500, 32.0),
103            )),
104            merge_snapshot_updates: registry.register(metric!(
105                name: "mz_storage_upsert_merge_snapshot_updates_total",
106                help: "The batch size, \
107                    of merging snapshot updates into upsert state for this source. \
108                    Specific implementations of upsert state may have more detailed \
109                    metrics about sub-batches.",
110                var_labels: ["source_id", "worker_id"],
111            )),
112            merge_snapshot_inserts: registry.register(metric!(
113                name: "mz_storage_upsert_merge_snapshot_inserts_total",
114                help: "The number of inserts in a batch for merging snapshot updates \
115                    for this source.",
116                var_labels: ["source_id", "worker_id"],
117            )),
118            merge_snapshot_deletes: registry.register(metric!(
119                name: "mz_storage_upsert_merge_snapshot_deletes_total",
120                help: "The number of deletes in a batch for merging snapshot updates \
121                    for this source.",
122                var_labels: ["source_id", "worker_id"],
123            )),
124            upsert_inserts: registry.register(metric!(
125                name: "mz_storage_upsert_inserts_total",
126                help: "The number of inserts done by the upsert operator",
127                var_labels: ["source_id", "worker_id"],
128            )),
129            upsert_updates: registry.register(metric!(
130                name: "mz_storage_upsert_updates_total",
131                help: "The number of updates done by the upsert operator",
132                var_labels: ["source_id", "worker_id"],
133            )),
134            upsert_deletes: registry.register(metric!(
135                name: "mz_storage_upsert_deletes_total",
136                help: "The number of deletes done by the upsert operator.",
137                var_labels: ["source_id", "worker_id"],
138            )),
139            multi_get_latency: registry.register(metric!(
140                name: "mz_storage_upsert_multi_get_latency",
141                help: "The latencies, in fractional seconds, \
142                    of getting values from the upsert state for this source. \
143                    Specific implementations of upsert state may have more detailed \
144                    metrics about sub-batches.",
145                var_labels: ["source_id"],
146                buckets: histogram_seconds_buckets(0.000_500, 32.0),
147            )),
148            multi_get_size: registry.register(metric!(
149                name: "mz_storage_upsert_multi_get_size_total",
150                help: "The batch size, \
151                    of getting values from the upsert state for this source. \
152                    Specific implementations of upsert state may have more detailed \
153                    metrics about sub-batches.",
154                var_labels: ["source_id", "worker_id"],
155            )),
156            multi_get_result_count: registry.register(metric!(
157                name: "mz_storage_upsert_multi_get_result_count_total",
158                help: "The number of non-empty records returned in a multi_get batch. \
159                    Specific implementations of upsert state may have more detailed \
160                    metrics about sub-batches.",
161                var_labels: ["source_id", "worker_id"],
162            )),
163            multi_get_result_bytes: registry.register(metric!(
164                name: "mz_storage_upsert_multi_get_result_bytes_total",
165                help: "The total size of records returned in a multi_get batch. \
166                    Specific implementations of upsert state may have more detailed \
167                    metrics about sub-batches.",
168                var_labels: ["source_id", "worker_id"],
169            )),
170            multi_put_latency: registry.register(metric!(
171                name: "mz_storage_upsert_multi_put_latency",
172                help: "The latencies, in fractional seconds, \
173                    of getting values into the upsert state for this source. \
174                    Specific implementations of upsert state may have more detailed \
175                    metrics about sub-batches.",
176                var_labels: ["source_id"],
177                buckets: histogram_seconds_buckets(0.000_500, 32.0),
178            )),
179            multi_put_size: registry.register(metric!(
180                name: "mz_storage_upsert_multi_put_size_total",
181                help: "The batch size, \
182                    of getting values into the upsert state for this source. \
183                    Specific implementations of upsert state may have more detailed \
184                    metrics about sub-batches.",
185                var_labels: ["source_id", "worker_id"],
186            )),
187            shared,
188            rocksdb_multi_get_latency: registry.register(metric!(
189                name: "mz_storage_rocksdb_multi_get_latency",
190                help: "The latencies, in fractional seconds, \
191                    of getting batches of values from RocksDB for this source.",
192                var_labels: ["source_id"],
193                buckets: histogram_seconds_buckets(0.000_500, 32.0),
194            )),
195            rocksdb_multi_get_size: registry.register(metric!(
196                name: "mz_storage_rocksdb_multi_get_size_total",
197                help: "The batch size, \
198                    of getting batches of values from RocksDB for this source.",
199                var_labels: ["source_id", "worker_id"],
200            )),
201            rocksdb_multi_get_result_count: registry.register(metric!(
202                name: "mz_storage_rocksdb_multi_get_result_count_total",
203                help: "The number of non-empty records returned, \
204                    when getting batches of values from RocksDB for this source.",
205                var_labels: ["source_id", "worker_id"],
206            )),
207            rocksdb_multi_get_result_bytes: registry.register(metric!(
208                name: "mz_storage_rocksdb_multi_get_result_bytes_total",
209                help: "The total size of records returned, \
210                    when getting batches of values from RocksDB for this source.",
211                var_labels: ["source_id", "worker_id"],
212            )),
213            rocksdb_multi_get_count: registry.register(metric!(
214                name: "mz_storage_rocksdb_multi_get_count_total",
215                help: "The number of calls to rocksdb multi_get.",
216                var_labels: ["source_id", "worker_id"],
217            )),
218            rocksdb_multi_put_count: registry.register(metric!(
219                name: "mz_storage_rocksdb_multi_put_count_total",
220                help: "The number of calls to rocksdb multi_put.",
221                var_labels: ["source_id", "worker_id"],
222            )),
223            rocksdb_multi_put_latency: registry.register(metric!(
224                name: "mz_storage_rocksdb_multi_put_latency",
225                help: "The latencies, in fractional seconds, \
226                    of putting batches of values into RocksDB for this source.",
227                var_labels: ["source_id"],
228                buckets: histogram_seconds_buckets(0.000_500, 32.0),
229            )),
230            rocksdb_multi_put_size: registry.register(metric!(
231                name: "mz_storage_rocksdb_multi_put_size_total",
232                help: "The batch size, \
233                    of putting batches of values into RocksDB for this source.",
234                var_labels: ["source_id", "worker_id"],
235            )),
236            rocksdb_shared,
237        }
238    }
239
240    /// Get a shared-across-workers instance of an `UpsertSharedMetrics`.
241    pub(crate) fn shared(&self, source_id: &GlobalId) -> Arc<UpsertSharedMetrics> {
242        let mut shared = self.shared.lock().expect("mutex poisoned");
243        if let Some(shared_metrics) = shared.get(source_id) {
244            if let Some(shared_metrics) = shared_metrics.upgrade() {
245                return Arc::clone(&shared_metrics);
246            } else {
247                assert!(shared.remove(source_id).is_some());
248            }
249        }
250        let shared_metrics = Arc::new(UpsertSharedMetrics::new(self, *source_id));
251        assert!(
252            shared
253                .insert(source_id.clone(), Arc::downgrade(&shared_metrics))
254                .is_none()
255        );
256        shared_metrics
257    }
258
259    /// Get a shared-across-workers instance of an `RocksDBSharedMetrics`
260    pub(crate) fn rocksdb_shared(
261        &self,
262        source_id: &GlobalId,
263    ) -> Arc<mz_rocksdb::RocksDBSharedMetrics> {
264        let mut rocksdb = self.rocksdb_shared.lock().expect("mutex poisoned");
265        if let Some(shared_metrics) = rocksdb.get(source_id) {
266            if let Some(shared_metrics) = shared_metrics.upgrade() {
267                return Arc::clone(&shared_metrics);
268            } else {
269                assert!(rocksdb.remove(source_id).is_some());
270            }
271        }
272
273        let rocksdb_metrics = {
274            let source_id = source_id.to_string();
275            mz_rocksdb::RocksDBSharedMetrics {
276                multi_get_latency: self
277                    .rocksdb_multi_get_latency
278                    .get_delete_on_drop_metric(vec![source_id.clone()]),
279                multi_put_latency: self
280                    .rocksdb_multi_put_latency
281                    .get_delete_on_drop_metric(vec![source_id.clone()]),
282            }
283        };
284
285        let rocksdb_metrics = Arc::new(rocksdb_metrics);
286        assert!(
287            rocksdb
288                .insert(source_id.clone(), Arc::downgrade(&rocksdb_metrics))
289                .is_none()
290        );
291        rocksdb_metrics
292    }
293}
294
295/// Metrics for upsert source shared across workers.
296#[derive(Debug)]
297pub(crate) struct UpsertSharedMetrics {
298    pub(crate) merge_snapshot_latency: DeleteOnDropHistogram<Vec<String>>,
299    pub(crate) multi_get_latency: DeleteOnDropHistogram<Vec<String>>,
300    pub(crate) multi_put_latency: DeleteOnDropHistogram<Vec<String>>,
301}
302
303impl UpsertSharedMetrics {
304    /// Create an `UpsertSharedMetrics` from the `UpsertMetricDefs`
305    fn new(metrics: &UpsertMetricDefs, source_id: GlobalId) -> Self {
306        let source_id = source_id.to_string();
307        UpsertSharedMetrics {
308            merge_snapshot_latency: metrics
309                .merge_snapshot_latency
310                .get_delete_on_drop_metric(vec![source_id.clone()]),
311            multi_get_latency: metrics
312                .multi_get_latency
313                .get_delete_on_drop_metric(vec![source_id.clone()]),
314            multi_put_latency: metrics
315                .multi_put_latency
316                .get_delete_on_drop_metric(vec![source_id.clone()]),
317        }
318    }
319}
320
321/// Metrics related to backpressure in `UPSERT` dataflows.
322#[derive(Clone, Debug)]
323pub(crate) struct UpsertBackpressureMetricDefs {
324    pub(crate) emitted_bytes: IntCounterVec,
325    pub(crate) last_backpressured_bytes: UIntGaugeVec,
326    pub(crate) retired_bytes: IntCounterVec,
327}
328
329impl UpsertBackpressureMetricDefs {
330    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
331        // We add a `worker_id` label here, even though only 1 worker is ever
332        // active, as this is the simplest way to avoid the non-active
333        // workers from un-registering metrics. This is consistent with how
334        // `persist_sink` metrics work.
335        Self {
336            emitted_bytes: registry.register(metric!(
337                name: "mz_storage_upsert_backpressure_emitted_bytes",
338                help: "A counter with the number of emitted bytes.",
339                var_labels: ["source_id", "worker_id"],
340            )),
341            last_backpressured_bytes: registry.register(metric!(
342                name: "mz_storage_upsert_backpressure_last_backpressured_bytes",
343                help: "The last count of bytes we are waiting to be retired in \
344                    the operator. This cannot be directly compared to \
345                    `retired_bytes`, but CAN indicate that backpressure is happening.",
346                var_labels: ["source_id", "worker_id"],
347            )),
348            retired_bytes: registry.register(metric!(
349                name: "mz_storage_upsert_backpressure_retired_bytes",
350                help:"A counter with the number of bytes retired by downstream processing.",
351                var_labels: ["source_id", "worker_id"],
352            )),
353        }
354    }
355}
356
357/// Metrics for the `upsert` operator.
358pub struct UpsertMetrics {
359    pub(crate) rehydration_latency: DeleteOnDropGauge<AtomicF64, Vec<String>>,
360    pub(crate) rehydration_total: DeleteOnDropGauge<AtomicU64, Vec<String>>,
361    pub(crate) rehydration_updates: DeleteOnDropGauge<AtomicU64, Vec<String>>,
362
363    pub(crate) merge_snapshot_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
364    pub(crate) merge_snapshot_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
365    pub(crate) merge_snapshot_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
366    pub(crate) upsert_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
367    pub(crate) upsert_updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
368    pub(crate) upsert_deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
369    pub(crate) multi_get_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
370    pub(crate) multi_get_result_bytes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
371    pub(crate) multi_get_result_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
372    pub(crate) multi_put_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
373
374    pub(crate) shared: Arc<UpsertSharedMetrics>,
375    pub(crate) rocksdb_shared: Arc<mz_rocksdb::RocksDBSharedMetrics>,
376    pub(crate) rocksdb_instance_metrics: Arc<mz_rocksdb::RocksDBInstanceMetrics>,
377    // `UpsertMetrics` keeps a reference (through `Arc`'s) to backpressure metrics, so that
378    // they are not dropped when the `persist_source` operator is dropped.
379    _backpressure_metrics: Option<BackpressureMetrics>,
380}
381
382impl UpsertMetrics {
383    /// Create an `UpsertMetrics` from the `UpsertMetricDefs`.
384    pub(crate) fn new(
385        defs: &UpsertMetricDefs,
386        source_id: GlobalId,
387        worker_id: usize,
388        backpressure_metrics: Option<BackpressureMetrics>,
389    ) -> Self {
390        let source_id_s = source_id.to_string();
391        let worker_id = worker_id.to_string();
392        Self {
393            rehydration_latency: defs
394                .rehydration_latency
395                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
396            rehydration_total: defs
397                .rehydration_total
398                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
399            rehydration_updates: defs
400                .rehydration_updates
401                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
402            merge_snapshot_updates: defs
403                .merge_snapshot_updates
404                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
405            merge_snapshot_inserts: defs
406                .merge_snapshot_inserts
407                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
408            merge_snapshot_deletes: defs
409                .merge_snapshot_deletes
410                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
411            upsert_inserts: defs
412                .upsert_inserts
413                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
414            upsert_updates: defs
415                .upsert_updates
416                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
417            upsert_deletes: defs
418                .upsert_deletes
419                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
420            multi_get_size: defs
421                .multi_get_size
422                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
423            multi_get_result_count: defs
424                .multi_get_result_count
425                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
426            multi_get_result_bytes: defs
427                .multi_get_result_bytes
428                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
429            multi_put_size: defs
430                .multi_put_size
431                .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
432
433            shared: defs.shared(&source_id),
434            rocksdb_shared: defs.rocksdb_shared(&source_id),
435            rocksdb_instance_metrics: Arc::new(RocksDBInstanceMetrics {
436                multi_get_size: defs
437                    .rocksdb_multi_get_size
438                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
439                multi_get_result_count: defs
440                    .rocksdb_multi_get_result_count
441                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
442                multi_get_result_bytes: defs
443                    .rocksdb_multi_get_result_bytes
444                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
445                multi_get_count: defs
446                    .rocksdb_multi_get_count
447                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
448                multi_put_count: defs
449                    .rocksdb_multi_put_count
450                    .get_delete_on_drop_metric(vec![source_id_s.clone(), worker_id.clone()]),
451                multi_put_size: defs
452                    .rocksdb_multi_put_size
453                    .get_delete_on_drop_metric(vec![source_id_s, worker_id]),
454            }),
455            _backpressure_metrics: backpressure_metrics,
456        }
457    }
458}