mz_storage_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the storage controller components
11
12use std::sync::Arc;
13use std::time::Duration;
14
15use mz_cluster_client::ReplicaId;
16use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
17use mz_ore::cast::{CastFrom, TryCastFrom};
18use mz_ore::metric;
19use mz_ore::metrics::{
20    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, IntCounterVec,
21    MetricVecExt, MetricsRegistry, UIntGaugeVec,
22};
23use mz_ore::stats::HISTOGRAM_BYTE_BUCKETS;
24use mz_repr::GlobalId;
25use mz_service::codec::StatsCollector;
26use mz_storage_types::instances::StorageInstanceId;
27use prometheus::core::{AtomicF64, AtomicU64};
28
29use crate::client::{ProtoStorageCommand, ProtoStorageResponse};
30
31pub type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
32
33/// Storage controller metrics
34#[derive(Debug, Clone)]
35pub struct StorageControllerMetrics {
36    messages_sent_bytes: prometheus::HistogramVec,
37    messages_received_bytes: prometheus::HistogramVec,
38    regressed_offset_known: IntCounterVec,
39    history_command_count: UIntGaugeVec,
40
41    // replica connections
42    connected_replica_count: UIntGaugeVec,
43    replica_connects_total: IntCounterVec,
44    replica_connect_wait_time_seconds_total: CounterVec,
45
46    /// Metrics shared with the compute controller.
47    shared: ControllerMetrics,
48}
49
50impl StorageControllerMetrics {
51    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
52        Self {
53            messages_sent_bytes: metrics_registry.register(metric!(
54                name: "mz_storage_messages_sent_bytes",
55                help: "size of storage messages sent",
56                var_labels: ["instance_id", "replica_id"],
57                buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
58            )),
59            messages_received_bytes: metrics_registry.register(metric!(
60                name: "mz_storage_messages_received_bytes",
61                help: "size of storage messages received",
62                var_labels: ["instance_id", "replica_id"],
63                buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
64            )),
65            regressed_offset_known: metrics_registry.register(metric!(
66                name: "mz_storage_regressed_offset_known",
67                help: "number of regressed offset_known stats for this id",
68                var_labels: ["id"],
69            )),
70            history_command_count: metrics_registry.register(metric!(
71                name: "mz_storage_controller_history_command_count",
72                help: "The number of commands in the controller's command history.",
73                var_labels: ["instance_id", "command_type"],
74            )),
75            connected_replica_count: metrics_registry.register(metric!(
76                name: "mz_storage_controller_connected_replica_count",
77                help: "The number of replicas successfully connected to the storage controller.",
78                var_labels: ["instance_id"],
79            )),
80            replica_connects_total: metrics_registry.register(metric!(
81                name: "mz_storage_controller_replica_connects_total",
82                help: "The total number of replica (re-)connections made by the storage controller.",
83                var_labels: ["instance_id", "replica_id"],
84            )),
85            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
86                name: "mz_storage_controller_replica_connect_wait_time_seconds_total",
87                help: "The total time the storage controller spent waiting for replica (re-)connection.",
88                var_labels: ["instance_id", "replica_id"],
89            )),
90
91            shared,
92        }
93    }
94
95    pub fn regressed_offset_known(
96        &self,
97        id: mz_repr::GlobalId,
98    ) -> DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>> {
99        self.regressed_offset_known
100            .get_delete_on_drop_metric(vec![id.to_string()])
101    }
102
103    pub fn wallclock_lag_metrics(
104        &self,
105        id: GlobalId,
106        instance_id: Option<StorageInstanceId>,
107    ) -> WallclockLagMetrics {
108        self.shared
109            .wallclock_lag_metrics(id.to_string(), instance_id.map(|x| x.to_string()), None)
110    }
111
112    pub fn for_instance(&self, id: StorageInstanceId) -> InstanceMetrics {
113        let connected_replica_count = self
114            .connected_replica_count
115            .get_delete_on_drop_metric(vec![id.to_string()]);
116
117        InstanceMetrics {
118            instance_id: id,
119            metrics: self.clone(),
120            connected_replica_count,
121        }
122    }
123}
124
125#[derive(Debug)]
126pub struct InstanceMetrics {
127    instance_id: StorageInstanceId,
128    metrics: StorageControllerMetrics,
129    /// Gauge tracking the number of connected replicas.
130    pub connected_replica_count: UIntGauge,
131}
132
133impl InstanceMetrics {
134    pub fn for_replica(&self, id: ReplicaId) -> ReplicaMetrics {
135        let labels = vec![self.instance_id.to_string(), id.to_string()];
136        ReplicaMetrics {
137            inner: Arc::new(ReplicaMetricsInner {
138                messages_sent_bytes: self
139                    .metrics
140                    .messages_sent_bytes
141                    .get_delete_on_drop_metric(labels.clone()),
142                messages_received_bytes: self
143                    .metrics
144                    .messages_received_bytes
145                    .get_delete_on_drop_metric(labels.clone()),
146                replica_connects_total: self
147                    .metrics
148                    .replica_connects_total
149                    .get_delete_on_drop_metric(labels.clone()),
150                replica_connect_wait_time_seconds_total: self
151                    .metrics
152                    .replica_connect_wait_time_seconds_total
153                    .get_delete_on_drop_metric(labels),
154            }),
155        }
156    }
157
158    pub fn for_history(&self) -> HistoryMetrics {
159        let command_gauge = |name: &str| {
160            let labels = vec![self.instance_id.to_string(), name.to_string()];
161            self.metrics
162                .history_command_count
163                .get_delete_on_drop_metric(labels)
164        };
165
166        HistoryMetrics {
167            create_timely_count: command_gauge("create_timely"),
168            run_ingestions_count: command_gauge("run_ingestions"),
169            run_sinks_count: command_gauge("run_sinks"),
170            allow_compaction_count: command_gauge("allow_compaction"),
171            initialization_complete_count: command_gauge("initialization_complete"),
172            allow_writes_count: command_gauge("allow_writes"),
173            update_configuration_count: command_gauge("update_configuration"),
174        }
175    }
176}
177
178#[derive(Debug)]
179struct ReplicaMetricsInner {
180    messages_sent_bytes: DeleteOnDropHistogram<Vec<String>>,
181    messages_received_bytes: DeleteOnDropHistogram<Vec<String>>,
182    /// Counter tracking the total number of (re-)connects.
183    replica_connects_total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
184    /// Counter tracking the total time spent waiting for (re-)connects.
185    replica_connect_wait_time_seconds_total: DeleteOnDropCounter<AtomicF64, Vec<String>>,
186}
187
188/// Per-instance metrics
189#[derive(Debug, Clone)]
190pub struct ReplicaMetrics {
191    inner: Arc<ReplicaMetricsInner>,
192}
193
194impl ReplicaMetrics {
195    /// Observe a successful replica connection.
196    pub fn observe_connect(&self) {
197        self.inner.replica_connects_total.inc();
198    }
199
200    /// Observe time spent waiting for a replica connection.
201    pub fn observe_connect_time(&self, wait_time: Duration) {
202        self.inner
203            .replica_connect_wait_time_seconds_total
204            .inc_by(wait_time.as_secs_f64());
205    }
206}
207
208/// Make [`ReplicaMetrics`] pluggable into the gRPC connection.
209impl StatsCollector<ProtoStorageCommand, ProtoStorageResponse> for ReplicaMetrics {
210    fn send_event(&self, _item: &ProtoStorageCommand, size: usize) {
211        match f64::try_cast_from(u64::cast_from(size)) {
212            Some(x) => self.inner.messages_sent_bytes.observe(x),
213            None => tracing::warn!(
214                "{} has no precise representation as f64, ignoring message",
215                size
216            ),
217        }
218    }
219
220    fn receive_event(&self, _item: &ProtoStorageResponse, size: usize) {
221        match f64::try_cast_from(u64::cast_from(size)) {
222            Some(x) => self.inner.messages_received_bytes.observe(x),
223            None => tracing::warn!(
224                "{} has no precise representation as f64, ignoring message",
225                size
226            ),
227        }
228    }
229}
230
231/// Metrics tracked by the command history.
232#[derive(Debug)]
233pub struct HistoryMetrics {
234    /// Number of `CreateTimely` commands.
235    pub create_timely_count: UIntGauge,
236    /// Number of `RunIngestion` commands.
237    pub run_ingestions_count: UIntGauge,
238    /// Number of `RunSink` commands.
239    pub run_sinks_count: UIntGauge,
240    /// Number of `AllowCompaction` commands.
241    pub allow_compaction_count: UIntGauge,
242    /// Number of `InitializationComplete` commands.
243    pub initialization_complete_count: UIntGauge,
244    /// Number of `AllowWrites` commands.
245    pub allow_writes_count: UIntGauge,
246    /// Number of `UpdateConfiguration` commands.
247    pub update_configuration_count: UIntGauge,
248}
249
250impl HistoryMetrics {
251    pub fn reset(&self) {
252        self.create_timely_count.set(0);
253        self.run_ingestions_count.set(0);
254        self.run_sinks_count.set(0);
255        self.allow_compaction_count.set(0);
256        self.initialization_complete_count.set(0);
257        self.allow_writes_count.set(0);
258        self.update_configuration_count.set(0);
259    }
260}