mz_txn_wal/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Prometheus monitoring metrics.
11
12use std::future::Future;
13use std::time::Instant;
14
15use mz_ore::metric;
16use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry, UIntGauge};
17use prometheus::{CounterVec, IntCounterVec, IntGauge};
18
19/// Prometheus monitoring metrics.
20pub struct Metrics {
21    pub(crate) data_shard_count: UIntGauge,
22
23    pub(crate) register: FallibleOpMetrics,
24    pub(crate) commit: FallibleOpMetrics,
25    pub(crate) forget: FallibleOpMetrics,
26    pub(crate) forget_all: FallibleOpMetrics,
27    pub(crate) apply_le: InfallibleOpMetrics,
28    pub(crate) compact_to: InfallibleOpMetrics,
29
30    pub(crate) batches: BatchMetrics,
31}
32
33impl std::fmt::Debug for Metrics {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("Metrics").finish_non_exhaustive()
36    }
37}
38
39impl Metrics {
40    /// Returns a new [Metrics] instance connected to the given registry.
41    pub fn new(registry: &MetricsRegistry) -> Self {
42        let ops = OpsMetrics::new(registry);
43        Metrics {
44            data_shard_count: registry.register(metric!(
45                name: "mz_txn_data_shard_count",
46                help: "count of data shards registered to the txn set",
47            )),
48            register: ops.fallible("register"),
49            commit: ops.fallible("commit"),
50            forget: ops.fallible("forget"),
51            forget_all: ops.fallible("forget_all"),
52            apply_le: ops.infallible("apply_le"),
53            compact_to: ops.infallible("compact_to"),
54            batches: BatchMetrics::new(registry),
55        }
56    }
57}
58
59struct OpsMetrics {
60    started_count: IntCounterVec,
61    retry_count: IntCounterVec,
62    succeeded_count: IntCounterVec,
63    errored_count: IntCounterVec,
64    duration_seconds: CounterVec,
65}
66
67impl OpsMetrics {
68    fn new(registry: &MetricsRegistry) -> Self {
69        OpsMetrics {
70            started_count: registry.register(metric!(
71                name: "mz_txn_op_started_count",
72                help: "count of times a txn operation started",
73                var_labels: ["op"],
74            )),
75            retry_count: registry.register(metric!(
76                name: "mz_txn_op_retry_count",
77                help: "count of times a txn operation retried",
78                var_labels: ["op"],
79            )),
80            succeeded_count: registry.register(metric!(
81                name: "mz_txn_op_succeeded_count",
82                help: "count of times a txn operation succeeded",
83                var_labels: ["op"],
84            )),
85            errored_count: registry.register(metric!(
86                name: "mz_txn_op_errored_count",
87                help: "count of times a txn operation errored",
88                var_labels: ["op"],
89            )),
90            duration_seconds: registry.register(metric!(
91                name: "mz_txn_op_duration_seconds",
92                help: "time spent running a txn operation",
93                var_labels: ["op"],
94            )),
95        }
96    }
97
98    fn fallible(&self, name: &str) -> FallibleOpMetrics {
99        FallibleOpMetrics {
100            started_count: self.started_count.with_label_values(&[name]),
101            retry_count: self.retry_count.with_label_values(&[name]),
102            succeeded_count: self.succeeded_count.with_label_values(&[name]),
103            errored_count: self.errored_count.with_label_values(&[name]),
104            duration_seconds: self.duration_seconds.with_label_values(&[name]),
105        }
106    }
107
108    fn infallible(&self, name: &str) -> InfallibleOpMetrics {
109        InfallibleOpMetrics {
110            started_count: self.started_count.with_label_values(&[name]),
111            succeeded_count: self.succeeded_count.with_label_values(&[name]),
112            duration_seconds: self.duration_seconds.with_label_values(&[name]),
113        }
114    }
115}
116
117pub(crate) struct FallibleOpMetrics {
118    started_count: IntCounter,
119    pub(crate) retry_count: IntCounter,
120    succeeded_count: IntCounter,
121    errored_count: IntCounter,
122    duration_seconds: Counter,
123}
124
125impl FallibleOpMetrics {
126    pub(crate) async fn run<R, E>(&self, op: impl Future<Output = Result<R, E>>) -> Result<R, E> {
127        let start = Instant::now();
128        self.started_count.inc();
129        let res = op.await;
130        match res {
131            Ok(_) => self.succeeded_count.inc(),
132            Err(_) => self.errored_count.inc(),
133        }
134        self.duration_seconds.inc_by(start.elapsed().as_secs_f64());
135        res
136    }
137}
138
139pub(crate) struct InfallibleOpMetrics {
140    started_count: IntCounter,
141    succeeded_count: IntCounter,
142    duration_seconds: Counter,
143}
144
145impl InfallibleOpMetrics {
146    pub(crate) async fn run<R>(&self, op: impl Future<Output = R>) -> R {
147        let start = Instant::now();
148        self.started_count.inc();
149        let res = op.await;
150        self.succeeded_count.inc();
151        self.duration_seconds.inc_by(start.elapsed().as_secs_f64());
152        res
153    }
154}
155
156pub(crate) struct BatchMetrics {
157    pub(crate) commit_count: IntCounter,
158    pub(crate) commit_bytes: IntCounter,
159    pub(crate) unapplied_count: UIntGauge,
160    pub(crate) unapplied_bytes: UIntGauge,
161    pub(crate) unapplied_min_ts: IntGauge,
162}
163
164impl BatchMetrics {
165    fn new(registry: &MetricsRegistry) -> Self {
166        BatchMetrics {
167            commit_count: registry.register(metric!(
168                name: "mz_txn_batch_commit_count",
169                help: "count of batches committed via txn",
170            )),
171            commit_bytes: registry.register(metric!(
172                name: "mz_txn_batch_commit_bytes",
173                help: "total bytes committed via txn",
174            )),
175            unapplied_count: registry.register(metric!(
176                name: "mz_txn_batch_unapplied_count",
177                help: "count of batches committed via txn but not yet applied",
178            )),
179            unapplied_bytes: registry.register(metric!(
180                name: "mz_txn_batch_unapplied_bytes",
181                help: "total bytes committed via txn but not yet applied",
182            )),
183            unapplied_min_ts: registry.register(metric!(
184                name: "mz_txn_batch_unapplied_min_ts",
185                help: "minimum ts of txn committed via txn but not yet applied",
186            )),
187        }
188    }
189}