mz_timestamp_oracle/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Prometheus monitoring metrics.
11
12use std::time::{Duration, Instant};
13
14use mz_ore::metric;
15use mz_ore::metrics::raw::{CounterVec, IntCounterVec};
16use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
17use mz_postgres_client::metrics::PostgresClientMetrics;
18
19use crate::retry::RetryStream;
20
21/// Prometheus monitoring metrics for timestamp oracles.
22///
23/// Intentionally not Clone because we expect this to be passed around in an
24/// Arc.
25pub struct Metrics {
26    _vecs: MetricsVecs,
27
28    /// Metrics for
29    /// [`TimestampOracle`](crate::TimestampOracle).
30    pub oracle: OracleMetrics,
31
32    /// Metrics recording how many operations we batch into one oracle call, for
33    /// those operations that _do_ support batching, and only when using the
34    /// `BatchingTimestampOracle` wrapper.
35    pub batching: BatchingMetrics,
36
37    /// Metrics for each retry loop.
38    pub retries: RetriesMetrics,
39
40    /// Metrics for [`PostgresClient`](mz_postgres_client::PostgresClient).
41    pub postgres_client: PostgresClientMetrics,
42}
43
44impl std::fmt::Debug for Metrics {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("Metrics").finish_non_exhaustive()
47    }
48}
49
50impl Metrics {
51    /// Returns a new [Metrics] instance connected to the given registry.
52    pub fn new(registry: &MetricsRegistry) -> Self {
53        let vecs = MetricsVecs::new(registry);
54
55        Metrics {
56            oracle: vecs.oracle_metrics(),
57            batching: vecs.batching_metrics(),
58            retries: vecs.retries_metrics(),
59            postgres_client: PostgresClientMetrics::new(registry, "mz_ts_oracle"),
60            _vecs: vecs,
61        }
62    }
63}
64
65#[derive(Debug)]
66struct MetricsVecs {
67    external_op_started: IntCounterVec,
68    external_op_succeeded: IntCounterVec,
69    external_op_failed: IntCounterVec,
70    external_op_seconds: CounterVec,
71
72    retry_started: IntCounterVec,
73    retry_finished: IntCounterVec,
74    retry_retries: IntCounterVec,
75    retry_sleep_seconds: CounterVec,
76
77    batched_op_count: IntCounterVec,
78    batches_count: IntCounterVec,
79}
80
81impl MetricsVecs {
82    fn new(registry: &MetricsRegistry) -> Self {
83        MetricsVecs {
84            external_op_started: registry.register(metric!(
85                name: "mz_ts_oracle_started_count",
86                help: "count of oracle operations started",
87                var_labels: ["op"],
88            )),
89            external_op_succeeded: registry.register(metric!(
90                name: "mz_ts_oracle_succeeded_count",
91                help: "count of oracle operations succeeded",
92                var_labels: ["op"],
93            )),
94            external_op_failed: registry.register(metric!(
95                name: "mz_ts_oracle_failed_count",
96                help: "count of oracle operations failed",
97                var_labels: ["op"],
98            )),
99            external_op_seconds: registry.register(metric!(
100                name: "mz_ts_oracle_seconds",
101                help: "time spent in oracle operations",
102                var_labels: ["op"],
103            )),
104
105            retry_started: registry.register(metric!(
106                name: "mz_ts_oracle_retry_started_count",
107                help: "count of retry loops started",
108                var_labels: ["op"],
109            )),
110            retry_finished: registry.register(metric!(
111                name: "mz_ts_oracle_retry_finished_count",
112                help: "count of retry loops finished",
113                var_labels: ["op"],
114            )),
115            retry_retries: registry.register(metric!(
116                name: "mz_ts_oracle_retry_retries_count",
117                help: "count of total attempts by retry loops",
118                var_labels: ["op"],
119            )),
120            retry_sleep_seconds: registry.register(metric!(
121                name: "mz_ts_oracle_retry_sleep_seconds",
122                help: "time spent in retry loop backoff",
123                var_labels: ["op"],
124            )),
125
126            batched_op_count: registry.register(metric!(
127                name: "mz_ts_oracle_batched_op_count",
128                help: "count of batched operations",
129                var_labels: ["op"],
130            )),
131
132            batches_count: registry.register(metric!(
133                name: "mz_ts_oracle_batches_count",
134                help: "count of batches of operations",
135                var_labels: ["op"],
136            )),
137        }
138    }
139
140    fn oracle_metrics(&self) -> OracleMetrics {
141        OracleMetrics {
142            write_ts: self.external_op_metrics("write_ts"),
143            peek_write_ts: self.external_op_metrics("peek_write_ts"),
144            read_ts: self.external_op_metrics("read_ts"),
145            apply_write: self.external_op_metrics("apply_write"),
146        }
147    }
148
149    fn external_op_metrics(&self, op: &str) -> ExternalOpMetrics {
150        ExternalOpMetrics {
151            started: self.external_op_started.with_label_values(&[op]),
152            succeeded: self.external_op_succeeded.with_label_values(&[op]),
153            failed: self.external_op_failed.with_label_values(&[op]),
154            seconds: self.external_op_seconds.with_label_values(&[op]),
155        }
156    }
157
158    fn batching_metrics(&self) -> BatchingMetrics {
159        BatchingMetrics {
160            read_ts: self.batched_op_metrics("read_ts"),
161        }
162    }
163
164    fn batched_op_metrics(&self, op: &str) -> BatchedOpMetrics {
165        BatchedOpMetrics {
166            ops_count: self.batched_op_count.with_label_values(&[op]),
167            batches_count: self.batches_count.with_label_values(&[op]),
168        }
169    }
170
171    fn retries_metrics(&self) -> RetriesMetrics {
172        RetriesMetrics {
173            open: self.retry_metrics("open"),
174            get_all_timelines: self.retry_metrics("get_all_timelines"),
175            write_ts: self.retry_metrics("write_ts"),
176            peek_write_ts: self.retry_metrics("peek_write_ts"),
177            read_ts: self.retry_metrics("read_ts"),
178            apply_write: self.retry_metrics("apply_write"),
179        }
180    }
181
182    fn retry_metrics(&self, name: &str) -> RetryMetrics {
183        RetryMetrics {
184            name: name.to_owned(),
185            started: self.retry_started.with_label_values(&[name]),
186            finished: self.retry_finished.with_label_values(&[name]),
187            retries: self.retry_retries.with_label_values(&[name]),
188            sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
189        }
190    }
191}
192
193#[derive(Debug)]
194pub struct ExternalOpMetrics {
195    started: IntCounter,
196    succeeded: IntCounter,
197    failed: IntCounter,
198    seconds: Counter,
199}
200
201impl ExternalOpMetrics {
202    pub(crate) async fn run_op<R, F, OpFn>(&self, op_fn: OpFn) -> Result<R, anyhow::Error>
203    where
204        F: std::future::Future<Output = Result<R, anyhow::Error>>,
205        OpFn: FnOnce() -> F,
206    {
207        self.started.inc();
208        let start = Instant::now();
209        let res = op_fn().await;
210        let elapsed_seconds = start.elapsed().as_secs_f64();
211        self.seconds.inc_by(elapsed_seconds);
212        match res.as_ref() {
213            Ok(_) => self.succeeded.inc(),
214            Err(_err) => {
215                self.failed.inc();
216            }
217        };
218        res
219    }
220}
221
222#[derive(Debug)]
223pub struct OracleMetrics {
224    pub write_ts: ExternalOpMetrics,
225    pub peek_write_ts: ExternalOpMetrics,
226    pub read_ts: ExternalOpMetrics,
227    pub apply_write: ExternalOpMetrics,
228}
229
230#[derive(Debug)]
231pub struct BatchedOpMetrics {
232    pub ops_count: IntCounter,
233    pub batches_count: IntCounter,
234}
235
236#[derive(Debug)]
237pub struct BatchingMetrics {
238    pub read_ts: BatchedOpMetrics,
239}
240
241#[derive(Debug)]
242pub struct RetryMetrics {
243    pub(crate) name: String,
244    pub(crate) started: IntCounter,
245    pub(crate) finished: IntCounter,
246    pub(crate) retries: IntCounter,
247    pub(crate) sleep_seconds: Counter,
248}
249
250impl RetryMetrics {
251    pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
252        MetricsRetryStream::new(retry, self)
253    }
254}
255
256#[derive(Debug)]
257pub struct RetriesMetrics {
258    pub open: RetryMetrics,
259    pub get_all_timelines: RetryMetrics,
260    pub(crate) write_ts: RetryMetrics,
261    pub(crate) peek_write_ts: RetryMetrics,
262    pub(crate) read_ts: RetryMetrics,
263    pub(crate) apply_write: RetryMetrics,
264}
265
266struct IncOnDrop(IntCounter);
267
268impl Drop for IncOnDrop {
269    fn drop(&mut self) {
270        self.0.inc()
271    }
272}
273
274pub struct MetricsRetryStream {
275    retry: RetryStream,
276    pub(crate) retries: IntCounter,
277    sleep_seconds: Counter,
278    _finished: IncOnDrop,
279}
280
281impl MetricsRetryStream {
282    pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
283        metrics.started.inc();
284        MetricsRetryStream {
285            retry,
286            retries: metrics.retries.clone(),
287            sleep_seconds: metrics.sleep_seconds.clone(),
288            _finished: IncOnDrop(metrics.finished.clone()),
289        }
290    }
291
292    /// How many times [Self::sleep] has been called.
293    pub fn attempt(&self) -> usize {
294        self.retry.attempt()
295    }
296
297    /// The next sleep (without jitter for easy printing in logs).
298    pub fn next_sleep(&self) -> Duration {
299        self.retry.next_sleep()
300    }
301
302    /// Executes the next sleep in the series.
303    ///
304    /// This isn't cancel-safe, so it consumes and returns self, to prevent
305    /// accidental mis-use.
306    pub async fn sleep(self) -> Self {
307        self.retries.inc();
308        self.sleep_seconds
309            .inc_by(self.retry.next_sleep().as_secs_f64());
310        let retry = self.retry.sleep().await;
311        MetricsRetryStream {
312            retry,
313            retries: self.retries,
314            sleep_seconds: self.sleep_seconds,
315            _finished: self._finished,
316        }
317    }
318}