mz_storage_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the storage controller components
11
12use std::sync::Arc;
13use std::time::Duration;
14
15use mz_cluster_client::ReplicaId;
16use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
17use mz_ore::cast::CastFrom;
18use mz_ore::metric;
19use mz_ore::metrics::{
20    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, MetricsRegistry,
21    UIntGaugeVec,
22};
23use mz_repr::GlobalId;
24use mz_service::codec::StatsCollector;
25use mz_service::transport;
26use mz_storage_types::instances::StorageInstanceId;
27use prometheus::core::{AtomicF64, AtomicU64};
28
29use crate::client::{ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse};
30
31type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
32pub type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
33
34/// Storage controller metrics
35#[derive(Debug, Clone)]
36pub struct StorageControllerMetrics {
37    // storage protocol
38    commands_total: IntCounterVec,
39    command_message_bytes_total: IntCounterVec,
40    responses_total: IntCounterVec,
41    response_message_bytes_total: IntCounterVec,
42
43    regressed_offset_known: IntCounterVec,
44    history_command_count: UIntGaugeVec,
45
46    // replica connections
47    connected_replica_count: UIntGaugeVec,
48    replica_connects_total: IntCounterVec,
49    replica_connect_wait_time_seconds_total: CounterVec,
50
51    /// Metrics shared with the compute controller.
52    shared: ControllerMetrics,
53}
54
55impl StorageControllerMetrics {
56    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
57        Self {
58            commands_total: metrics_registry.register(metric!(
59                name: "mz_storage_commands_total",
60                help: "The total number of storage commands sent.",
61                var_labels: ["instance_id", "replica_id", "command_type"],
62            )),
63            command_message_bytes_total: metrics_registry.register(metric!(
64                name: "mz_storage_command_message_bytes_total",
65                help: "The total number of bytes sent in storage command messages.",
66                var_labels: ["instance_id", "replica_id"],
67            )),
68            responses_total: metrics_registry.register(metric!(
69                name: "mz_storage_responses_total",
70                help: "The total number of storage responses received.",
71                var_labels: ["instance_id", "replica_id", "response_type"],
72            )),
73            response_message_bytes_total: metrics_registry.register(metric!(
74                name: "mz_storage_response_message_bytes_total",
75                help: "The total number of bytes received in storage response messages.",
76                var_labels: ["instance_id", "replica_id"],
77            )),
78            regressed_offset_known: metrics_registry.register(metric!(
79                name: "mz_storage_regressed_offset_known",
80                help: "number of regressed offset_known stats for this id",
81                var_labels: ["id"],
82            )),
83            history_command_count: metrics_registry.register(metric!(
84                name: "mz_storage_controller_history_command_count",
85                help: "The number of commands in the controller's command history.",
86                var_labels: ["instance_id", "command_type"],
87            )),
88            connected_replica_count: metrics_registry.register(metric!(
89                name: "mz_storage_controller_connected_replica_count",
90                help: "The number of replicas successfully connected to the storage controller.",
91                var_labels: ["instance_id"],
92            )),
93            replica_connects_total: metrics_registry.register(metric!(
94                name: "mz_storage_controller_replica_connects_total",
95                help: "The total number of replica (re-)connections made by the storage controller.",
96                var_labels: ["instance_id", "replica_id"],
97            )),
98            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
99                name: "mz_storage_controller_replica_connect_wait_time_seconds_total",
100                help: "The total time the storage controller spent waiting for replica (re-)connection.",
101                var_labels: ["instance_id", "replica_id"],
102            )),
103
104            shared,
105        }
106    }
107
108    pub fn regressed_offset_known(
109        &self,
110        id: mz_repr::GlobalId,
111    ) -> DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>> {
112        self.regressed_offset_known
113            .get_delete_on_drop_metric(vec![id.to_string()])
114    }
115
116    pub fn wallclock_lag_metrics(
117        &self,
118        id: GlobalId,
119        instance_id: Option<StorageInstanceId>,
120    ) -> WallclockLagMetrics {
121        self.shared
122            .wallclock_lag_metrics(id.to_string(), instance_id.map(|x| x.to_string()), None)
123    }
124
125    pub fn for_instance(&self, id: StorageInstanceId) -> InstanceMetrics {
126        let connected_replica_count = self
127            .connected_replica_count
128            .get_delete_on_drop_metric(vec![id.to_string()]);
129
130        InstanceMetrics {
131            instance_id: id,
132            metrics: self.clone(),
133            connected_replica_count,
134        }
135    }
136}
137
138#[derive(Debug)]
139pub struct InstanceMetrics {
140    instance_id: StorageInstanceId,
141    metrics: StorageControllerMetrics,
142    /// Gauge tracking the number of connected replicas.
143    pub connected_replica_count: UIntGauge,
144}
145
146impl InstanceMetrics {
147    pub fn for_replica(&self, id: ReplicaId) -> ReplicaMetrics {
148        let labels = vec![self.instance_id.to_string(), id.to_string()];
149        let extended_labels = |extra: &str| {
150            labels
151                .iter()
152                .cloned()
153                .chain([extra.into()])
154                .collect::<Vec<_>>()
155        };
156
157        ReplicaMetrics {
158            inner: Arc::new(ReplicaMetricsInner {
159                commands_total: CommandMetrics::build(|typ| {
160                    let labels = extended_labels(typ);
161                    self.metrics
162                        .commands_total
163                        .get_delete_on_drop_metric(labels)
164                }),
165                responses_total: ResponseMetrics::build(|typ| {
166                    let labels = extended_labels(typ);
167                    self.metrics
168                        .responses_total
169                        .get_delete_on_drop_metric(labels)
170                }),
171                command_message_bytes_total: self
172                    .metrics
173                    .command_message_bytes_total
174                    .get_delete_on_drop_metric(labels.clone()),
175                response_message_bytes_total: self
176                    .metrics
177                    .response_message_bytes_total
178                    .get_delete_on_drop_metric(labels.clone()),
179                replica_connects_total: self
180                    .metrics
181                    .replica_connects_total
182                    .get_delete_on_drop_metric(labels.clone()),
183                replica_connect_wait_time_seconds_total: self
184                    .metrics
185                    .replica_connect_wait_time_seconds_total
186                    .get_delete_on_drop_metric(labels),
187            }),
188        }
189    }
190
191    pub fn for_history(&self) -> HistoryMetrics {
192        let command_counts = CommandMetrics::build(|typ| {
193            let labels = vec![self.instance_id.to_string(), typ.to_string()];
194            self.metrics
195                .history_command_count
196                .get_delete_on_drop_metric(labels)
197        });
198
199        HistoryMetrics { command_counts }
200    }
201}
202
203#[derive(Debug)]
204struct ReplicaMetricsInner {
205    commands_total: CommandMetrics<IntCounter>,
206    command_message_bytes_total: IntCounter,
207    responses_total: ResponseMetrics<IntCounter>,
208    response_message_bytes_total: IntCounter,
209    /// Counter tracking the total number of (re-)connects.
210    replica_connects_total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
211    /// Counter tracking the total time spent waiting for (re-)connects.
212    replica_connect_wait_time_seconds_total: DeleteOnDropCounter<AtomicF64, Vec<String>>,
213}
214
215/// Per-instance metrics
216#[derive(Debug, Clone)]
217pub struct ReplicaMetrics {
218    inner: Arc<ReplicaMetricsInner>,
219}
220
221impl ReplicaMetrics {
222    /// Observe a successful replica connection.
223    pub fn observe_connect(&self) {
224        self.inner.replica_connects_total.inc();
225    }
226
227    /// Observe time spent waiting for a replica connection.
228    pub fn observe_connect_time(&self, wait_time: Duration) {
229        self.inner
230            .replica_connect_wait_time_seconds_total
231            .inc_by(wait_time.as_secs_f64());
232    }
233}
234
235/// Make [`ReplicaMetrics`] pluggable into the gRPC connection.
236impl StatsCollector<ProtoStorageCommand, ProtoStorageResponse> for ReplicaMetrics {
237    fn send_event(&self, item: &ProtoStorageCommand, size: usize) {
238        self.inner.commands_total.for_proto_command(item).inc();
239        self.inner
240            .command_message_bytes_total
241            .inc_by(u64::cast_from(size));
242    }
243
244    fn receive_event(&self, item: &ProtoStorageResponse, size: usize) {
245        self.inner.responses_total.for_proto_response(item).inc();
246        self.inner
247            .response_message_bytes_total
248            .inc_by(u64::cast_from(size));
249    }
250}
251
252impl<T> transport::Metrics<StorageCommand<T>, StorageResponse<T>> for ReplicaMetrics {
253    fn bytes_sent(&mut self, len: usize) {
254        self.inner
255            .command_message_bytes_total
256            .inc_by(u64::cast_from(len));
257    }
258
259    fn bytes_received(&mut self, len: usize) {
260        self.inner
261            .response_message_bytes_total
262            .inc_by(u64::cast_from(len));
263    }
264
265    fn message_sent(&mut self, msg: &StorageCommand<T>) {
266        self.inner.commands_total.for_command(msg).inc();
267    }
268
269    fn message_received(&mut self, msg: &StorageResponse<T>) {
270        self.inner.responses_total.for_response(msg).inc();
271    }
272}
273
274/// Metrics keyed by `StorageCommand` type.
275#[derive(Clone, Debug)]
276pub struct CommandMetrics<M> {
277    /// Metrics for `Hello`.
278    pub hello: M,
279    /// Metrics for `InitializationComplete`.
280    pub initialization_complete: M,
281    /// Metrics for `AllowWrites`.
282    pub allow_writes: M,
283    /// Metrics for `UpdateConfiguration`.
284    pub update_configuration: M,
285    /// Metrics for `RunIngestion`.
286    pub run_ingestion: M,
287    /// Metrics for `AllowCompaction`.
288    pub allow_compaction: M,
289    /// Metrics for `RunSink`.
290    pub run_sink: M,
291    /// Metrics for `RunOneshotIngestion`.
292    pub run_oneshot_ingestion: M,
293    /// Metrics for `CancelOneshotIngestion`.
294    pub cancel_oneshot_ingestion: M,
295}
296
297impl<M> CommandMetrics<M> {
298    fn build<F>(build_metric: F) -> Self
299    where
300        F: Fn(&str) -> M,
301    {
302        Self {
303            hello: build_metric("hello"),
304            initialization_complete: build_metric("initialization_complete"),
305            allow_writes: build_metric("allow_writes"),
306            update_configuration: build_metric("update_configuration"),
307            run_ingestion: build_metric("run_ingestion"),
308            allow_compaction: build_metric("allow_compaction"),
309            run_sink: build_metric("run_sink"),
310            run_oneshot_ingestion: build_metric("run_oneshot_ingestion"),
311            cancel_oneshot_ingestion: build_metric("cancel_oneshot_ingestion"),
312        }
313    }
314
315    fn for_all<F>(&self, f: F)
316    where
317        F: Fn(&M),
318    {
319        f(&self.hello);
320        f(&self.initialization_complete);
321        f(&self.allow_writes);
322        f(&self.update_configuration);
323        f(&self.run_ingestion);
324        f(&self.allow_compaction);
325        f(&self.run_sink);
326        f(&self.run_oneshot_ingestion);
327        f(&self.cancel_oneshot_ingestion);
328    }
329
330    pub fn for_command<T>(&self, command: &StorageCommand<T>) -> &M {
331        use StorageCommand::*;
332
333        match command {
334            Hello { .. } => &self.hello,
335            InitializationComplete => &self.initialization_complete,
336            AllowWrites => &self.allow_writes,
337            UpdateConfiguration(..) => &self.update_configuration,
338            RunIngestion(..) => &self.run_ingestion,
339            AllowCompaction(..) => &self.allow_compaction,
340            RunSink(..) => &self.run_sink,
341            RunOneshotIngestion(..) => &self.run_oneshot_ingestion,
342            CancelOneshotIngestion(..) => &self.cancel_oneshot_ingestion,
343        }
344    }
345
346    fn for_proto_command(&self, proto: &ProtoStorageCommand) -> &M {
347        use crate::client::proto_storage_command::Kind::*;
348
349        match proto.kind.as_ref().unwrap() {
350            Hello(..) => &self.hello,
351            AllowCompaction(..) => &self.allow_compaction,
352            InitializationComplete(..) => &self.initialization_complete,
353            UpdateConfiguration(..) => &self.update_configuration,
354            RunIngestion(..) => &self.run_ingestion,
355            AllowWrites(..) => &self.allow_writes,
356            RunSink(..) => &self.run_sink,
357            RunOneshotIngestion(..) => &self.run_oneshot_ingestion,
358            CancelOneshotIngestion(..) => &self.cancel_oneshot_ingestion,
359        }
360    }
361}
362
363/// Metrics keyed by `StorageResponse` type.
364#[derive(Debug)]
365struct ResponseMetrics<M> {
366    frontier_upper: M,
367    dropped_id: M,
368    staged_batches: M,
369    statistics_updates: M,
370    status_update: M,
371}
372
373impl<M> ResponseMetrics<M> {
374    fn build<F>(build_metric: F) -> Self
375    where
376        F: Fn(&str) -> M,
377    {
378        Self {
379            frontier_upper: build_metric("frontier_upper"),
380            dropped_id: build_metric("dropped_id"),
381            staged_batches: build_metric("staged_batches"),
382            statistics_updates: build_metric("statistics_updates"),
383            status_update: build_metric("status_update"),
384        }
385    }
386
387    fn for_response<T>(&self, response: &StorageResponse<T>) -> &M {
388        use StorageResponse::*;
389
390        match response {
391            FrontierUpper(..) => &self.frontier_upper,
392            DroppedId(..) => &self.dropped_id,
393            StagedBatches(..) => &self.staged_batches,
394            StatisticsUpdates(..) => &self.statistics_updates,
395            StatusUpdate(..) => &self.status_update,
396        }
397    }
398
399    fn for_proto_response(&self, proto: &ProtoStorageResponse) -> &M {
400        use crate::client::proto_storage_response::Kind::*;
401
402        match proto.kind.as_ref().unwrap() {
403            FrontierUpper(..) => &self.frontier_upper,
404            DroppedId(..) => &self.dropped_id,
405            Stats(..) => &self.statistics_updates,
406            StatusUpdate(..) => &self.status_update,
407            StagedBatches(..) => &self.staged_batches,
408        }
409    }
410}
411
412/// Metrics tracked by the command history.
413#[derive(Debug)]
414pub struct HistoryMetrics {
415    /// Metrics tracking command counts.
416    pub command_counts: CommandMetrics<UIntGauge>,
417}
418
419impl HistoryMetrics {
420    pub fn reset(&self) {
421        self.command_counts.for_all(|m| m.set(0));
422    }
423}