1use std::future::Future;
13use std::time::Instant;
14
15use mz_ore::metric;
16use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry, UIntGauge};
17use prometheus::{CounterVec, IntCounterVec, IntGauge};
18
19pub struct Metrics {
21 pub(crate) data_shard_count: UIntGauge,
22
23 pub(crate) register: FallibleOpMetrics,
24 pub(crate) commit: FallibleOpMetrics,
25 pub(crate) forget: FallibleOpMetrics,
26 pub(crate) forget_all: FallibleOpMetrics,
27 pub(crate) apply_le: InfallibleOpMetrics,
28 pub(crate) compact_to: InfallibleOpMetrics,
29
30 pub(crate) batches: BatchMetrics,
31}
32
33impl std::fmt::Debug for Metrics {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("Metrics").finish_non_exhaustive()
36 }
37}
38
39impl Metrics {
40 pub fn new(registry: &MetricsRegistry) -> Self {
42 let ops = OpsMetrics::new(registry);
43 Metrics {
44 data_shard_count: registry.register(metric!(
45 name: "mz_txn_data_shard_count",
46 help: "count of data shards registered to the txn set",
47 )),
48 register: ops.fallible("register"),
49 commit: ops.fallible("commit"),
50 forget: ops.fallible("forget"),
51 forget_all: ops.fallible("forget_all"),
52 apply_le: ops.infallible("apply_le"),
53 compact_to: ops.infallible("compact_to"),
54 batches: BatchMetrics::new(registry),
55 }
56 }
57}
58
59struct OpsMetrics {
60 started_count: IntCounterVec,
61 retry_count: IntCounterVec,
62 succeeded_count: IntCounterVec,
63 errored_count: IntCounterVec,
64 duration_seconds: CounterVec,
65}
66
67impl OpsMetrics {
68 fn new(registry: &MetricsRegistry) -> Self {
69 OpsMetrics {
70 started_count: registry.register(metric!(
71 name: "mz_txn_op_started_count",
72 help: "count of times a txn operation started",
73 var_labels: ["op"],
74 )),
75 retry_count: registry.register(metric!(
76 name: "mz_txn_op_retry_count",
77 help: "count of times a txn operation retried",
78 var_labels: ["op"],
79 )),
80 succeeded_count: registry.register(metric!(
81 name: "mz_txn_op_succeeded_count",
82 help: "count of times a txn operation succeeded",
83 var_labels: ["op"],
84 )),
85 errored_count: registry.register(metric!(
86 name: "mz_txn_op_errored_count",
87 help: "count of times a txn operation errored",
88 var_labels: ["op"],
89 )),
90 duration_seconds: registry.register(metric!(
91 name: "mz_txn_op_duration_seconds",
92 help: "time spent running a txn operation",
93 var_labels: ["op"],
94 )),
95 }
96 }
97
98 fn fallible(&self, name: &str) -> FallibleOpMetrics {
99 FallibleOpMetrics {
100 started_count: self.started_count.with_label_values(&[name]),
101 retry_count: self.retry_count.with_label_values(&[name]),
102 succeeded_count: self.succeeded_count.with_label_values(&[name]),
103 errored_count: self.errored_count.with_label_values(&[name]),
104 duration_seconds: self.duration_seconds.with_label_values(&[name]),
105 }
106 }
107
108 fn infallible(&self, name: &str) -> InfallibleOpMetrics {
109 InfallibleOpMetrics {
110 started_count: self.started_count.with_label_values(&[name]),
111 succeeded_count: self.succeeded_count.with_label_values(&[name]),
112 duration_seconds: self.duration_seconds.with_label_values(&[name]),
113 }
114 }
115}
116
117pub(crate) struct FallibleOpMetrics {
118 started_count: IntCounter,
119 pub(crate) retry_count: IntCounter,
120 succeeded_count: IntCounter,
121 errored_count: IntCounter,
122 duration_seconds: Counter,
123}
124
125impl FallibleOpMetrics {
126 pub(crate) async fn run<R, E>(&self, op: impl Future<Output = Result<R, E>>) -> Result<R, E> {
127 let start = Instant::now();
128 self.started_count.inc();
129 let res = op.await;
130 match res {
131 Ok(_) => self.succeeded_count.inc(),
132 Err(_) => self.errored_count.inc(),
133 }
134 self.duration_seconds.inc_by(start.elapsed().as_secs_f64());
135 res
136 }
137}
138
139pub(crate) struct InfallibleOpMetrics {
140 started_count: IntCounter,
141 succeeded_count: IntCounter,
142 duration_seconds: Counter,
143}
144
145impl InfallibleOpMetrics {
146 pub(crate) async fn run<R>(&self, op: impl Future<Output = R>) -> R {
147 let start = Instant::now();
148 self.started_count.inc();
149 let res = op.await;
150 self.succeeded_count.inc();
151 self.duration_seconds.inc_by(start.elapsed().as_secs_f64());
152 res
153 }
154}
155
156pub(crate) struct BatchMetrics {
157 pub(crate) commit_count: IntCounter,
158 pub(crate) commit_bytes: IntCounter,
159 pub(crate) unapplied_count: UIntGauge,
160 pub(crate) unapplied_bytes: UIntGauge,
161 pub(crate) unapplied_min_ts: IntGauge,
162}
163
164impl BatchMetrics {
165 fn new(registry: &MetricsRegistry) -> Self {
166 BatchMetrics {
167 commit_count: registry.register(metric!(
168 name: "mz_txn_batch_commit_count",
169 help: "count of batches committed via txn",
170 )),
171 commit_bytes: registry.register(metric!(
172 name: "mz_txn_batch_commit_bytes",
173 help: "total bytes committed via txn",
174 )),
175 unapplied_count: registry.register(metric!(
176 name: "mz_txn_batch_unapplied_count",
177 help: "count of batches committed via txn but not yet applied",
178 )),
179 unapplied_bytes: registry.register(metric!(
180 name: "mz_txn_batch_unapplied_bytes",
181 help: "total bytes committed via txn but not yet applied",
182 )),
183 unapplied_min_ts: registry.register(metric!(
184 name: "mz_txn_batch_unapplied_min_ts",
185 help: "minimum ts of txn committed via txn but not yet applied",
186 )),
187 }
188 }
189}