1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeSet;
use std::ops::{Deref, DerefMut};
use std::sync::{Mutex, MutexGuard};

use mz_ore::cast::CastFrom;
use mz_ore::metric;
use mz_ore::metrics::{MetricsRegistry, UIntGauge};
use mz_persist_types::ShardId;
use prometheus::Counter;

#[derive(Debug)]
pub struct StorageCollectionsMetrics {
    pub finalization_outstanding: UIntGauge,
    pub finalization_pending_commit: UIntGauge,
    pub finalization_started: Counter,
    pub finalization_succeeded: Counter,
    pub finalization_failed: Counter,
}

impl StorageCollectionsMetrics {
    pub fn register_into(registry: &MetricsRegistry) -> Self {
        StorageCollectionsMetrics {
            finalization_outstanding: registry.register(metric!(
                name: "mz_shard_finalization_outstanding",
                help: "count of shards in need of finalization",
            )),
            finalization_pending_commit: registry.register(metric!(
                name: "mz_shard_finalization_pending_commit",
                help: "count of shards for which finalization has completed but has not yet been durably recorded",
            )),
            finalization_started: registry.register(metric!(
                name: "mz_shard_finalization_op_started",
                help: "count of shard finalization operations that have started",
            )),
            finalization_succeeded: registry.register(metric!(
                name: "mz_shard_finalization_op_succeeded",
                help: "count of shard finalization operations that succeeded",
            )),
            finalization_failed: registry.register(metric!(
                name: "mz_shard_finalization_op_failed",
                help: "count of shard finalization operations that failed",
            )),
        }
    }
}

/// A set of shard IDs that maintains a gauge containing the set's size.
#[derive(Debug)]
pub struct ShardIdSet {
    set: Mutex<BTreeSet<ShardId>>,
    gauge: UIntGauge,
}

impl ShardIdSet {
    pub fn new(gauge: UIntGauge) -> ShardIdSet {
        ShardIdSet {
            set: Mutex::new(BTreeSet::new()),
            gauge,
        }
    }

    pub fn lock(&self) -> ShardIdSetGuard {
        ShardIdSetGuard {
            set: self.set.lock().expect("lock poisoned"),
            gauge: &self.gauge,
        }
    }
}

#[derive(Debug)]
pub struct ShardIdSetGuard<'a> {
    set: MutexGuard<'a, BTreeSet<ShardId>>,
    gauge: &'a UIntGauge,
}

impl Drop for ShardIdSetGuard<'_> {
    fn drop(&mut self) {
        self.gauge.set(u64::cast_from(self.set.len()));
    }
}

impl<'a> Deref for ShardIdSetGuard<'a> {
    type Target = BTreeSet<ShardId>;

    fn deref(&self) -> &Self::Target {
        self.set.deref()
    }
}

impl DerefMut for ShardIdSetGuard<'_> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.set.deref_mut()
    }
}