1use std::sync::Arc;
13use std::time::Duration;
14
15use mz_cluster_client::ReplicaId;
16use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
17use mz_ore::cast::{CastFrom, TryCastFrom};
18use mz_ore::metric;
19use mz_ore::metrics::{
20 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, IntCounterVec,
21 MetricVecExt, MetricsRegistry, UIntGaugeVec,
22};
23use mz_ore::stats::HISTOGRAM_BYTE_BUCKETS;
24use mz_repr::GlobalId;
25use mz_service::codec::StatsCollector;
26use mz_storage_types::instances::StorageInstanceId;
27use prometheus::core::{AtomicF64, AtomicU64};
28
29use crate::client::{ProtoStorageCommand, ProtoStorageResponse};
30
31pub type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
32
33#[derive(Debug, Clone)]
35pub struct StorageControllerMetrics {
36 messages_sent_bytes: prometheus::HistogramVec,
37 messages_received_bytes: prometheus::HistogramVec,
38 regressed_offset_known: IntCounterVec,
39 history_command_count: UIntGaugeVec,
40
41 connected_replica_count: UIntGaugeVec,
43 replica_connects_total: IntCounterVec,
44 replica_connect_wait_time_seconds_total: CounterVec,
45
46 shared: ControllerMetrics,
48}
49
50impl StorageControllerMetrics {
51 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
52 Self {
53 messages_sent_bytes: metrics_registry.register(metric!(
54 name: "mz_storage_messages_sent_bytes",
55 help: "size of storage messages sent",
56 var_labels: ["instance_id", "replica_id"],
57 buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
58 )),
59 messages_received_bytes: metrics_registry.register(metric!(
60 name: "mz_storage_messages_received_bytes",
61 help: "size of storage messages received",
62 var_labels: ["instance_id", "replica_id"],
63 buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
64 )),
65 regressed_offset_known: metrics_registry.register(metric!(
66 name: "mz_storage_regressed_offset_known",
67 help: "number of regressed offset_known stats for this id",
68 var_labels: ["id"],
69 )),
70 history_command_count: metrics_registry.register(metric!(
71 name: "mz_storage_controller_history_command_count",
72 help: "The number of commands in the controller's command history.",
73 var_labels: ["instance_id", "command_type"],
74 )),
75 connected_replica_count: metrics_registry.register(metric!(
76 name: "mz_storage_controller_connected_replica_count",
77 help: "The number of replicas successfully connected to the storage controller.",
78 var_labels: ["instance_id"],
79 )),
80 replica_connects_total: metrics_registry.register(metric!(
81 name: "mz_storage_controller_replica_connects_total",
82 help: "The total number of replica (re-)connections made by the storage controller.",
83 var_labels: ["instance_id", "replica_id"],
84 )),
85 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
86 name: "mz_storage_controller_replica_connect_wait_time_seconds_total",
87 help: "The total time the storage controller spent waiting for replica (re-)connection.",
88 var_labels: ["instance_id", "replica_id"],
89 )),
90
91 shared,
92 }
93 }
94
95 pub fn regressed_offset_known(
96 &self,
97 id: mz_repr::GlobalId,
98 ) -> DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>> {
99 self.regressed_offset_known
100 .get_delete_on_drop_metric(vec![id.to_string()])
101 }
102
103 pub fn wallclock_lag_metrics(
104 &self,
105 id: GlobalId,
106 instance_id: Option<StorageInstanceId>,
107 ) -> WallclockLagMetrics {
108 self.shared
109 .wallclock_lag_metrics(id.to_string(), instance_id.map(|x| x.to_string()), None)
110 }
111
112 pub fn for_instance(&self, id: StorageInstanceId) -> InstanceMetrics {
113 let connected_replica_count = self
114 .connected_replica_count
115 .get_delete_on_drop_metric(vec![id.to_string()]);
116
117 InstanceMetrics {
118 instance_id: id,
119 metrics: self.clone(),
120 connected_replica_count,
121 }
122 }
123}
124
125#[derive(Debug)]
126pub struct InstanceMetrics {
127 instance_id: StorageInstanceId,
128 metrics: StorageControllerMetrics,
129 pub connected_replica_count: UIntGauge,
131}
132
133impl InstanceMetrics {
134 pub fn for_replica(&self, id: ReplicaId) -> ReplicaMetrics {
135 let labels = vec![self.instance_id.to_string(), id.to_string()];
136 ReplicaMetrics {
137 inner: Arc::new(ReplicaMetricsInner {
138 messages_sent_bytes: self
139 .metrics
140 .messages_sent_bytes
141 .get_delete_on_drop_metric(labels.clone()),
142 messages_received_bytes: self
143 .metrics
144 .messages_received_bytes
145 .get_delete_on_drop_metric(labels.clone()),
146 replica_connects_total: self
147 .metrics
148 .replica_connects_total
149 .get_delete_on_drop_metric(labels.clone()),
150 replica_connect_wait_time_seconds_total: self
151 .metrics
152 .replica_connect_wait_time_seconds_total
153 .get_delete_on_drop_metric(labels),
154 }),
155 }
156 }
157
158 pub fn for_history(&self) -> HistoryMetrics {
159 let command_gauge = |name: &str| {
160 let labels = vec![self.instance_id.to_string(), name.to_string()];
161 self.metrics
162 .history_command_count
163 .get_delete_on_drop_metric(labels)
164 };
165
166 HistoryMetrics {
167 create_timely_count: command_gauge("create_timely"),
168 run_ingestions_count: command_gauge("run_ingestions"),
169 run_sinks_count: command_gauge("run_sinks"),
170 allow_compaction_count: command_gauge("allow_compaction"),
171 initialization_complete_count: command_gauge("initialization_complete"),
172 allow_writes_count: command_gauge("allow_writes"),
173 update_configuration_count: command_gauge("update_configuration"),
174 }
175 }
176}
177
178#[derive(Debug)]
179struct ReplicaMetricsInner {
180 messages_sent_bytes: DeleteOnDropHistogram<Vec<String>>,
181 messages_received_bytes: DeleteOnDropHistogram<Vec<String>>,
182 replica_connects_total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
184 replica_connect_wait_time_seconds_total: DeleteOnDropCounter<AtomicF64, Vec<String>>,
186}
187
188#[derive(Debug, Clone)]
190pub struct ReplicaMetrics {
191 inner: Arc<ReplicaMetricsInner>,
192}
193
194impl ReplicaMetrics {
195 pub fn observe_connect(&self) {
197 self.inner.replica_connects_total.inc();
198 }
199
200 pub fn observe_connect_time(&self, wait_time: Duration) {
202 self.inner
203 .replica_connect_wait_time_seconds_total
204 .inc_by(wait_time.as_secs_f64());
205 }
206}
207
208impl StatsCollector<ProtoStorageCommand, ProtoStorageResponse> for ReplicaMetrics {
210 fn send_event(&self, _item: &ProtoStorageCommand, size: usize) {
211 match f64::try_cast_from(u64::cast_from(size)) {
212 Some(x) => self.inner.messages_sent_bytes.observe(x),
213 None => tracing::warn!(
214 "{} has no precise representation as f64, ignoring message",
215 size
216 ),
217 }
218 }
219
220 fn receive_event(&self, _item: &ProtoStorageResponse, size: usize) {
221 match f64::try_cast_from(u64::cast_from(size)) {
222 Some(x) => self.inner.messages_received_bytes.observe(x),
223 None => tracing::warn!(
224 "{} has no precise representation as f64, ignoring message",
225 size
226 ),
227 }
228 }
229}
230
231#[derive(Debug)]
233pub struct HistoryMetrics {
234 pub create_timely_count: UIntGauge,
236 pub run_ingestions_count: UIntGauge,
238 pub run_sinks_count: UIntGauge,
240 pub allow_compaction_count: UIntGauge,
242 pub initialization_complete_count: UIntGauge,
244 pub allow_writes_count: UIntGauge,
246 pub update_configuration_count: UIntGauge,
248}
249
250impl HistoryMetrics {
251 pub fn reset(&self) {
252 self.create_timely_count.set(0);
253 self.run_ingestions_count.set(0);
254 self.run_sinks_count.set(0);
255 self.allow_compaction_count.set(0);
256 self.initialization_complete_count.set(0);
257 self.allow_writes_count.set(0);
258 self.update_configuration_count.set(0);
259 }
260}