1use std::time::{Duration, Instant};
13
14use mz_ore::metric;
15use mz_ore::metrics::raw::{CounterVec, IntCounterVec};
16use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry};
17use mz_postgres_client::metrics::PostgresClientMetrics;
18
19use crate::retry::RetryStream;
20
21pub struct Metrics {
26 _vecs: MetricsVecs,
27
28 pub oracle: OracleMetrics,
31
32 pub batching: BatchingMetrics,
36
37 pub retries: RetriesMetrics,
39
40 pub postgres_client: PostgresClientMetrics,
42}
43
44impl std::fmt::Debug for Metrics {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("Metrics").finish_non_exhaustive()
47 }
48}
49
50impl Metrics {
51 pub fn new(registry: &MetricsRegistry) -> Self {
53 let vecs = MetricsVecs::new(registry);
54
55 Metrics {
56 oracle: vecs.oracle_metrics(),
57 batching: vecs.batching_metrics(),
58 retries: vecs.retries_metrics(),
59 postgres_client: PostgresClientMetrics::new(registry, "mz_ts_oracle"),
60 _vecs: vecs,
61 }
62 }
63}
64
65#[derive(Debug)]
66struct MetricsVecs {
67 external_op_started: IntCounterVec,
68 external_op_succeeded: IntCounterVec,
69 external_op_failed: IntCounterVec,
70 external_op_seconds: CounterVec,
71
72 retry_started: IntCounterVec,
73 retry_finished: IntCounterVec,
74 retry_retries: IntCounterVec,
75 retry_sleep_seconds: CounterVec,
76
77 batched_op_count: IntCounterVec,
78 batches_count: IntCounterVec,
79}
80
81impl MetricsVecs {
82 fn new(registry: &MetricsRegistry) -> Self {
83 MetricsVecs {
84 external_op_started: registry.register(metric!(
85 name: "mz_ts_oracle_started_count",
86 help: "count of oracle operations started",
87 var_labels: ["op"],
88 )),
89 external_op_succeeded: registry.register(metric!(
90 name: "mz_ts_oracle_succeeded_count",
91 help: "count of oracle operations succeeded",
92 var_labels: ["op"],
93 )),
94 external_op_failed: registry.register(metric!(
95 name: "mz_ts_oracle_failed_count",
96 help: "count of oracle operations failed",
97 var_labels: ["op"],
98 )),
99 external_op_seconds: registry.register(metric!(
100 name: "mz_ts_oracle_seconds",
101 help: "time spent in oracle operations",
102 var_labels: ["op"],
103 )),
104
105 retry_started: registry.register(metric!(
106 name: "mz_ts_oracle_retry_started_count",
107 help: "count of retry loops started",
108 var_labels: ["op"],
109 )),
110 retry_finished: registry.register(metric!(
111 name: "mz_ts_oracle_retry_finished_count",
112 help: "count of retry loops finished",
113 var_labels: ["op"],
114 )),
115 retry_retries: registry.register(metric!(
116 name: "mz_ts_oracle_retry_retries_count",
117 help: "count of total attempts by retry loops",
118 var_labels: ["op"],
119 )),
120 retry_sleep_seconds: registry.register(metric!(
121 name: "mz_ts_oracle_retry_sleep_seconds",
122 help: "time spent in retry loop backoff",
123 var_labels: ["op"],
124 )),
125
126 batched_op_count: registry.register(metric!(
127 name: "mz_ts_oracle_batched_op_count",
128 help: "count of batched operations",
129 var_labels: ["op"],
130 )),
131
132 batches_count: registry.register(metric!(
133 name: "mz_ts_oracle_batches_count",
134 help: "count of batches of operations",
135 var_labels: ["op"],
136 )),
137 }
138 }
139
140 fn oracle_metrics(&self) -> OracleMetrics {
141 OracleMetrics {
142 write_ts: self.external_op_metrics("write_ts"),
143 peek_write_ts: self.external_op_metrics("peek_write_ts"),
144 read_ts: self.external_op_metrics("read_ts"),
145 apply_write: self.external_op_metrics("apply_write"),
146 }
147 }
148
149 fn external_op_metrics(&self, op: &str) -> ExternalOpMetrics {
150 ExternalOpMetrics {
151 started: self.external_op_started.with_label_values(&[op]),
152 succeeded: self.external_op_succeeded.with_label_values(&[op]),
153 failed: self.external_op_failed.with_label_values(&[op]),
154 seconds: self.external_op_seconds.with_label_values(&[op]),
155 }
156 }
157
158 fn batching_metrics(&self) -> BatchingMetrics {
159 BatchingMetrics {
160 read_ts: self.batched_op_metrics("read_ts"),
161 }
162 }
163
164 fn batched_op_metrics(&self, op: &str) -> BatchedOpMetrics {
165 BatchedOpMetrics {
166 ops_count: self.batched_op_count.with_label_values(&[op]),
167 batches_count: self.batches_count.with_label_values(&[op]),
168 }
169 }
170
171 fn retries_metrics(&self) -> RetriesMetrics {
172 RetriesMetrics {
173 open: self.retry_metrics("open"),
174 get_all_timelines: self.retry_metrics("get_all_timelines"),
175 write_ts: self.retry_metrics("write_ts"),
176 peek_write_ts: self.retry_metrics("peek_write_ts"),
177 read_ts: self.retry_metrics("read_ts"),
178 apply_write: self.retry_metrics("apply_write"),
179 }
180 }
181
182 fn retry_metrics(&self, name: &str) -> RetryMetrics {
183 RetryMetrics {
184 name: name.to_owned(),
185 started: self.retry_started.with_label_values(&[name]),
186 finished: self.retry_finished.with_label_values(&[name]),
187 retries: self.retry_retries.with_label_values(&[name]),
188 sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
189 }
190 }
191}
192
193#[derive(Debug)]
194pub struct ExternalOpMetrics {
195 started: IntCounter,
196 succeeded: IntCounter,
197 failed: IntCounter,
198 seconds: Counter,
199}
200
201impl ExternalOpMetrics {
202 pub(crate) async fn run_op<R, F, OpFn>(&self, op_fn: OpFn) -> Result<R, anyhow::Error>
203 where
204 F: std::future::Future<Output = Result<R, anyhow::Error>>,
205 OpFn: FnOnce() -> F,
206 {
207 self.started.inc();
208 let start = Instant::now();
209 let res = op_fn().await;
210 let elapsed_seconds = start.elapsed().as_secs_f64();
211 self.seconds.inc_by(elapsed_seconds);
212 match res.as_ref() {
213 Ok(_) => self.succeeded.inc(),
214 Err(_err) => {
215 self.failed.inc();
216 }
217 };
218 res
219 }
220}
221
222#[derive(Debug)]
223pub struct OracleMetrics {
224 pub write_ts: ExternalOpMetrics,
225 pub peek_write_ts: ExternalOpMetrics,
226 pub read_ts: ExternalOpMetrics,
227 pub apply_write: ExternalOpMetrics,
228}
229
230#[derive(Debug)]
231pub struct BatchedOpMetrics {
232 pub ops_count: IntCounter,
233 pub batches_count: IntCounter,
234}
235
236#[derive(Debug)]
237pub struct BatchingMetrics {
238 pub read_ts: BatchedOpMetrics,
239}
240
241#[derive(Debug)]
242pub struct RetryMetrics {
243 pub(crate) name: String,
244 pub(crate) started: IntCounter,
245 pub(crate) finished: IntCounter,
246 pub(crate) retries: IntCounter,
247 pub(crate) sleep_seconds: Counter,
248}
249
250impl RetryMetrics {
251 pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
252 MetricsRetryStream::new(retry, self)
253 }
254}
255
256#[derive(Debug)]
257pub struct RetriesMetrics {
258 pub open: RetryMetrics,
259 pub get_all_timelines: RetryMetrics,
260 pub(crate) write_ts: RetryMetrics,
261 pub(crate) peek_write_ts: RetryMetrics,
262 pub(crate) read_ts: RetryMetrics,
263 pub(crate) apply_write: RetryMetrics,
264}
265
266struct IncOnDrop(IntCounter);
267
268impl Drop for IncOnDrop {
269 fn drop(&mut self) {
270 self.0.inc()
271 }
272}
273
274pub struct MetricsRetryStream {
275 retry: RetryStream,
276 pub(crate) retries: IntCounter,
277 sleep_seconds: Counter,
278 _finished: IncOnDrop,
279}
280
281impl MetricsRetryStream {
282 pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
283 metrics.started.inc();
284 MetricsRetryStream {
285 retry,
286 retries: metrics.retries.clone(),
287 sleep_seconds: metrics.sleep_seconds.clone(),
288 _finished: IncOnDrop(metrics.finished.clone()),
289 }
290 }
291
292 pub fn attempt(&self) -> usize {
294 self.retry.attempt()
295 }
296
297 pub fn next_sleep(&self) -> Duration {
299 self.retry.next_sleep()
300 }
301
302 pub async fn sleep(self) -> Self {
307 self.retries.inc();
308 self.sleep_seconds
309 .inc_by(self.retry.next_sleep().as_secs_f64());
310 let retry = self.retry.sleep().await;
311 MetricsRetryStream {
312 retry,
313 retries: self.retries,
314 sleep_seconds: self.sleep_seconds,
315 _finished: self._finished,
316 }
317 }
318}