mz_storage_client/storage_collections/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::ops::{Deref, DerefMut};
12use std::sync::{Mutex, MutexGuard};
13
14use mz_ore::cast::CastFrom;
15use mz_ore::metric;
16use mz_ore::metrics::{MetricsRegistry, UIntGauge};
17use mz_persist_types::ShardId;
18use prometheus::Counter;
19
20#[derive(Debug)]
21pub struct StorageCollectionsMetrics {
22    pub finalization_outstanding: UIntGauge,
23    pub finalization_pending_commit: UIntGauge,
24    pub finalization_started: Counter,
25    pub finalization_succeeded: Counter,
26    pub finalization_failed: Counter,
27}
28
29impl StorageCollectionsMetrics {
30    pub fn register_into(registry: &MetricsRegistry) -> Self {
31        StorageCollectionsMetrics {
32            finalization_outstanding: registry.register(metric!(
33                name: "mz_shard_finalization_outstanding",
34                help: "count of shards in need of finalization",
35            )),
36            finalization_pending_commit: registry.register(metric!(
37                name: "mz_shard_finalization_pending_commit",
38                help: "count of shards for which finalization has completed but has not yet been durably recorded",
39            )),
40            finalization_started: registry.register(metric!(
41                name: "mz_shard_finalization_op_started",
42                help: "count of shard finalization operations that have started",
43            )),
44            finalization_succeeded: registry.register(metric!(
45                name: "mz_shard_finalization_op_succeeded",
46                help: "count of shard finalization operations that succeeded",
47            )),
48            finalization_failed: registry.register(metric!(
49                name: "mz_shard_finalization_op_failed",
50                help: "count of shard finalization operations that failed",
51            )),
52        }
53    }
54}
55
56/// A set of shard IDs that maintains a gauge containing the set's size.
57#[derive(Debug)]
58pub struct ShardIdSet {
59    set: Mutex<BTreeSet<ShardId>>,
60    gauge: UIntGauge,
61}
62
63impl ShardIdSet {
64    pub fn new(gauge: UIntGauge) -> ShardIdSet {
65        ShardIdSet {
66            set: Mutex::new(BTreeSet::new()),
67            gauge,
68        }
69    }
70
71    pub fn lock(&self) -> ShardIdSetGuard<'_> {
72        ShardIdSetGuard {
73            set: self.set.lock().expect("lock poisoned"),
74            gauge: &self.gauge,
75        }
76    }
77}
78
79#[derive(Debug)]
80pub struct ShardIdSetGuard<'a> {
81    set: MutexGuard<'a, BTreeSet<ShardId>>,
82    gauge: &'a UIntGauge,
83}
84
85impl Drop for ShardIdSetGuard<'_> {
86    fn drop(&mut self) {
87        self.gauge.set(u64::cast_from(self.set.len()));
88    }
89}
90
91impl<'a> Deref for ShardIdSetGuard<'a> {
92    type Target = BTreeSet<ShardId>;
93
94    fn deref(&self) -> &Self::Target {
95        self.set.deref()
96    }
97}
98
99impl DerefMut for ShardIdSetGuard<'_> {
100    fn deref_mut(&mut self) -> &mut Self::Target {
101        self.set.deref_mut()
102    }
103}