mz_storage_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the storage controller components
11
12use std::sync::Arc;
13use std::time::Duration;
14
15use mz_cluster_client::ReplicaId;
16use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
17use mz_ore::cast::CastFrom;
18use mz_ore::metric;
19use mz_ore::metrics::{
20    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, MetricsRegistry,
21    UIntGaugeVec,
22};
23use mz_repr::GlobalId;
24use mz_service::transport;
25use mz_storage_types::instances::StorageInstanceId;
26use prometheus::core::{AtomicF64, AtomicU64};
27
28use crate::client::{StorageCommand, StorageResponse};
29
30type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
31pub type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
32
33/// Storage controller metrics
34#[derive(Debug, Clone)]
35pub struct StorageControllerMetrics {
36    // storage protocol
37    commands_total: IntCounterVec,
38    command_message_bytes_total: IntCounterVec,
39    responses_total: IntCounterVec,
40    response_message_bytes_total: IntCounterVec,
41
42    regressed_offset_known: IntCounterVec,
43    history_command_count: UIntGaugeVec,
44
45    // replica connections
46    connected_replica_count: UIntGaugeVec,
47    replica_connects_total: IntCounterVec,
48    replica_connect_wait_time_seconds_total: CounterVec,
49
50    /// Metrics shared with the compute controller.
51    shared: ControllerMetrics,
52}
53
54impl StorageControllerMetrics {
55    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
56        Self {
57            commands_total: metrics_registry.register(metric!(
58                name: "mz_storage_commands_total",
59                help: "The total number of storage commands sent.",
60                var_labels: ["instance_id", "replica_id", "command_type"],
61            )),
62            command_message_bytes_total: metrics_registry.register(metric!(
63                name: "mz_storage_command_message_bytes_total",
64                help: "The total number of bytes sent in storage command messages.",
65                var_labels: ["instance_id", "replica_id"],
66            )),
67            responses_total: metrics_registry.register(metric!(
68                name: "mz_storage_responses_total",
69                help: "The total number of storage responses received.",
70                var_labels: ["instance_id", "replica_id", "response_type"],
71            )),
72            response_message_bytes_total: metrics_registry.register(metric!(
73                name: "mz_storage_response_message_bytes_total",
74                help: "The total number of bytes received in storage response messages.",
75                var_labels: ["instance_id", "replica_id"],
76            )),
77            regressed_offset_known: metrics_registry.register(metric!(
78                name: "mz_storage_regressed_offset_known",
79                help: "number of regressed offset_known stats for this id",
80                var_labels: ["id"],
81            )),
82            history_command_count: metrics_registry.register(metric!(
83                name: "mz_storage_controller_history_command_count",
84                help: "The number of commands in the controller's command history.",
85                var_labels: ["instance_id", "command_type"],
86            )),
87            connected_replica_count: metrics_registry.register(metric!(
88                name: "mz_storage_controller_connected_replica_count",
89                help: "The number of replicas successfully connected to the storage controller.",
90                var_labels: ["instance_id"],
91            )),
92            replica_connects_total: metrics_registry.register(metric!(
93                name: "mz_storage_controller_replica_connects_total",
94                help: "The total number of replica (re-)connections made by the storage controller.",
95                var_labels: ["instance_id", "replica_id"],
96            )),
97            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
98                name: "mz_storage_controller_replica_connect_wait_time_seconds_total",
99                help: "The total time the storage controller spent waiting for replica (re-)connection.",
100                var_labels: ["instance_id", "replica_id"],
101            )),
102
103            shared,
104        }
105    }
106
107    pub fn regressed_offset_known(
108        &self,
109        id: mz_repr::GlobalId,
110    ) -> DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>> {
111        self.regressed_offset_known
112            .get_delete_on_drop_metric(vec![id.to_string()])
113    }
114
115    pub fn wallclock_lag_metrics(
116        &self,
117        id: GlobalId,
118        instance_id: Option<StorageInstanceId>,
119    ) -> WallclockLagMetrics {
120        self.shared
121            .wallclock_lag_metrics(id.to_string(), instance_id.map(|x| x.to_string()), None)
122    }
123
124    pub fn for_instance(&self, id: StorageInstanceId) -> InstanceMetrics {
125        let connected_replica_count = self
126            .connected_replica_count
127            .get_delete_on_drop_metric(vec![id.to_string()]);
128
129        InstanceMetrics {
130            instance_id: id,
131            metrics: self.clone(),
132            connected_replica_count,
133        }
134    }
135}
136
137#[derive(Debug)]
138pub struct InstanceMetrics {
139    instance_id: StorageInstanceId,
140    metrics: StorageControllerMetrics,
141    /// Gauge tracking the number of connected replicas.
142    pub connected_replica_count: UIntGauge,
143}
144
145impl InstanceMetrics {
146    pub fn for_replica(&self, id: ReplicaId) -> ReplicaMetrics {
147        let labels = vec![self.instance_id.to_string(), id.to_string()];
148        let extended_labels = |extra: &str| {
149            labels
150                .iter()
151                .cloned()
152                .chain([extra.into()])
153                .collect::<Vec<_>>()
154        };
155
156        ReplicaMetrics {
157            inner: Arc::new(ReplicaMetricsInner {
158                commands_total: CommandMetrics::build(|typ| {
159                    let labels = extended_labels(typ);
160                    self.metrics
161                        .commands_total
162                        .get_delete_on_drop_metric(labels)
163                }),
164                responses_total: ResponseMetrics::build(|typ| {
165                    let labels = extended_labels(typ);
166                    self.metrics
167                        .responses_total
168                        .get_delete_on_drop_metric(labels)
169                }),
170                command_message_bytes_total: self
171                    .metrics
172                    .command_message_bytes_total
173                    .get_delete_on_drop_metric(labels.clone()),
174                response_message_bytes_total: self
175                    .metrics
176                    .response_message_bytes_total
177                    .get_delete_on_drop_metric(labels.clone()),
178                replica_connects_total: self
179                    .metrics
180                    .replica_connects_total
181                    .get_delete_on_drop_metric(labels.clone()),
182                replica_connect_wait_time_seconds_total: self
183                    .metrics
184                    .replica_connect_wait_time_seconds_total
185                    .get_delete_on_drop_metric(labels),
186            }),
187        }
188    }
189
190    pub fn for_history(&self) -> HistoryMetrics {
191        let command_counts = CommandMetrics::build(|typ| {
192            let labels = vec![self.instance_id.to_string(), typ.to_string()];
193            self.metrics
194                .history_command_count
195                .get_delete_on_drop_metric(labels)
196        });
197
198        HistoryMetrics { command_counts }
199    }
200}
201
202#[derive(Debug)]
203struct ReplicaMetricsInner {
204    commands_total: CommandMetrics<IntCounter>,
205    command_message_bytes_total: IntCounter,
206    responses_total: ResponseMetrics<IntCounter>,
207    response_message_bytes_total: IntCounter,
208    /// Counter tracking the total number of (re-)connects.
209    replica_connects_total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
210    /// Counter tracking the total time spent waiting for (re-)connects.
211    replica_connect_wait_time_seconds_total: DeleteOnDropCounter<AtomicF64, Vec<String>>,
212}
213
214/// Per-instance metrics
215#[derive(Debug, Clone)]
216pub struct ReplicaMetrics {
217    inner: Arc<ReplicaMetricsInner>,
218}
219
220impl ReplicaMetrics {
221    /// Observe a successful replica connection.
222    pub fn observe_connect(&self) {
223        self.inner.replica_connects_total.inc();
224    }
225
226    /// Observe time spent waiting for a replica connection.
227    pub fn observe_connect_time(&self, wait_time: Duration) {
228        self.inner
229            .replica_connect_wait_time_seconds_total
230            .inc_by(wait_time.as_secs_f64());
231    }
232}
233
234impl<T> transport::Metrics<StorageCommand<T>, StorageResponse<T>> for ReplicaMetrics {
235    fn bytes_sent(&mut self, len: usize) {
236        self.inner
237            .command_message_bytes_total
238            .inc_by(u64::cast_from(len));
239    }
240
241    fn bytes_received(&mut self, len: usize) {
242        self.inner
243            .response_message_bytes_total
244            .inc_by(u64::cast_from(len));
245    }
246
247    fn message_sent(&mut self, msg: &StorageCommand<T>) {
248        self.inner.commands_total.for_command(msg).inc();
249    }
250
251    fn message_received(&mut self, msg: &StorageResponse<T>) {
252        self.inner.responses_total.for_response(msg).inc();
253    }
254}
255
256/// Metrics keyed by `StorageCommand` type.
257#[derive(Clone, Debug)]
258pub struct CommandMetrics<M> {
259    /// Metrics for `Hello`.
260    pub hello: M,
261    /// Metrics for `InitializationComplete`.
262    pub initialization_complete: M,
263    /// Metrics for `AllowWrites`.
264    pub allow_writes: M,
265    /// Metrics for `UpdateConfiguration`.
266    pub update_configuration: M,
267    /// Metrics for `RunIngestion`.
268    pub run_ingestion: M,
269    /// Metrics for `AllowCompaction`.
270    pub allow_compaction: M,
271    /// Metrics for `RunSink`.
272    pub run_sink: M,
273    /// Metrics for `RunOneshotIngestion`.
274    pub run_oneshot_ingestion: M,
275    /// Metrics for `CancelOneshotIngestion`.
276    pub cancel_oneshot_ingestion: M,
277}
278
279impl<M> CommandMetrics<M> {
280    fn build<F>(build_metric: F) -> Self
281    where
282        F: Fn(&str) -> M,
283    {
284        Self {
285            hello: build_metric("hello"),
286            initialization_complete: build_metric("initialization_complete"),
287            allow_writes: build_metric("allow_writes"),
288            update_configuration: build_metric("update_configuration"),
289            run_ingestion: build_metric("run_ingestion"),
290            allow_compaction: build_metric("allow_compaction"),
291            run_sink: build_metric("run_sink"),
292            run_oneshot_ingestion: build_metric("run_oneshot_ingestion"),
293            cancel_oneshot_ingestion: build_metric("cancel_oneshot_ingestion"),
294        }
295    }
296
297    fn for_all<F>(&self, f: F)
298    where
299        F: Fn(&M),
300    {
301        f(&self.hello);
302        f(&self.initialization_complete);
303        f(&self.allow_writes);
304        f(&self.update_configuration);
305        f(&self.run_ingestion);
306        f(&self.allow_compaction);
307        f(&self.run_sink);
308        f(&self.run_oneshot_ingestion);
309        f(&self.cancel_oneshot_ingestion);
310    }
311
312    pub fn for_command<T>(&self, command: &StorageCommand<T>) -> &M {
313        use StorageCommand::*;
314
315        match command {
316            Hello { .. } => &self.hello,
317            InitializationComplete => &self.initialization_complete,
318            AllowWrites => &self.allow_writes,
319            UpdateConfiguration(..) => &self.update_configuration,
320            RunIngestion(..) => &self.run_ingestion,
321            AllowCompaction(..) => &self.allow_compaction,
322            RunSink(..) => &self.run_sink,
323            RunOneshotIngestion(..) => &self.run_oneshot_ingestion,
324            CancelOneshotIngestion(..) => &self.cancel_oneshot_ingestion,
325        }
326    }
327}
328
329/// Metrics keyed by `StorageResponse` type.
330#[derive(Debug)]
331struct ResponseMetrics<M> {
332    frontier_upper: M,
333    dropped_id: M,
334    staged_batches: M,
335    statistics_updates: M,
336    status_update: M,
337}
338
339impl<M> ResponseMetrics<M> {
340    fn build<F>(build_metric: F) -> Self
341    where
342        F: Fn(&str) -> M,
343    {
344        Self {
345            frontier_upper: build_metric("frontier_upper"),
346            dropped_id: build_metric("dropped_id"),
347            staged_batches: build_metric("staged_batches"),
348            statistics_updates: build_metric("statistics_updates"),
349            status_update: build_metric("status_update"),
350        }
351    }
352
353    fn for_response<T>(&self, response: &StorageResponse<T>) -> &M {
354        use StorageResponse::*;
355
356        match response {
357            FrontierUpper(..) => &self.frontier_upper,
358            DroppedId(..) => &self.dropped_id,
359            StagedBatches(..) => &self.staged_batches,
360            StatisticsUpdates(..) => &self.statistics_updates,
361            StatusUpdate(..) => &self.status_update,
362        }
363    }
364}
365
366/// Metrics tracked by the command history.
367#[derive(Debug)]
368pub struct HistoryMetrics {
369    /// Metrics tracking command counts.
370    pub command_counts: CommandMetrics<UIntGauge>,
371}
372
373impl HistoryMetrics {
374    pub fn reset(&self) {
375        self.command_counts.for_all(|m| m.set(0));
376    }
377}