1use std::sync::Arc;
13use std::time::Duration;
14
15use mz_cluster_client::ReplicaId;
16use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
17use mz_ore::cast::CastFrom;
18use mz_ore::metric;
19use mz_ore::metrics::{
20 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, MetricsRegistry,
21 UIntGaugeVec,
22};
23use mz_repr::GlobalId;
24use mz_service::transport;
25use mz_storage_types::instances::StorageInstanceId;
26use prometheus::core::{AtomicF64, AtomicU64};
27
28use crate::client::{StorageCommand, StorageResponse};
29
30type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
31pub type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
32
33#[derive(Debug, Clone)]
35pub struct StorageControllerMetrics {
36 commands_total: IntCounterVec,
38 command_message_bytes_total: IntCounterVec,
39 responses_total: IntCounterVec,
40 response_message_bytes_total: IntCounterVec,
41
42 regressed_offset_known: IntCounterVec,
43 history_command_count: UIntGaugeVec,
44
45 connected_replica_count: UIntGaugeVec,
47 replica_connects_total: IntCounterVec,
48 replica_connect_wait_time_seconds_total: CounterVec,
49
50 shared: ControllerMetrics,
52}
53
54impl StorageControllerMetrics {
55 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
56 Self {
57 commands_total: metrics_registry.register(metric!(
58 name: "mz_storage_commands_total",
59 help: "The total number of storage commands sent.",
60 var_labels: ["instance_id", "replica_id", "command_type"],
61 )),
62 command_message_bytes_total: metrics_registry.register(metric!(
63 name: "mz_storage_command_message_bytes_total",
64 help: "The total number of bytes sent in storage command messages.",
65 var_labels: ["instance_id", "replica_id"],
66 )),
67 responses_total: metrics_registry.register(metric!(
68 name: "mz_storage_responses_total",
69 help: "The total number of storage responses received.",
70 var_labels: ["instance_id", "replica_id", "response_type"],
71 )),
72 response_message_bytes_total: metrics_registry.register(metric!(
73 name: "mz_storage_response_message_bytes_total",
74 help: "The total number of bytes received in storage response messages.",
75 var_labels: ["instance_id", "replica_id"],
76 )),
77 regressed_offset_known: metrics_registry.register(metric!(
78 name: "mz_storage_regressed_offset_known",
79 help: "number of regressed offset_known stats for this id",
80 var_labels: ["id"],
81 )),
82 history_command_count: metrics_registry.register(metric!(
83 name: "mz_storage_controller_history_command_count",
84 help: "The number of commands in the controller's command history.",
85 var_labels: ["instance_id", "command_type"],
86 )),
87 connected_replica_count: metrics_registry.register(metric!(
88 name: "mz_storage_controller_connected_replica_count",
89 help: "The number of replicas successfully connected to the storage controller.",
90 var_labels: ["instance_id"],
91 )),
92 replica_connects_total: metrics_registry.register(metric!(
93 name: "mz_storage_controller_replica_connects_total",
94 help: "The total number of replica (re-)connections made by the storage controller.",
95 var_labels: ["instance_id", "replica_id"],
96 )),
97 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
98 name: "mz_storage_controller_replica_connect_wait_time_seconds_total",
99 help: "The total time the storage controller spent waiting for replica (re-)connection.",
100 var_labels: ["instance_id", "replica_id"],
101 )),
102
103 shared,
104 }
105 }
106
107 pub fn regressed_offset_known(
108 &self,
109 id: mz_repr::GlobalId,
110 ) -> DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>> {
111 self.regressed_offset_known
112 .get_delete_on_drop_metric(vec![id.to_string()])
113 }
114
115 pub fn wallclock_lag_metrics(
116 &self,
117 id: GlobalId,
118 instance_id: Option<StorageInstanceId>,
119 ) -> WallclockLagMetrics {
120 self.shared
121 .wallclock_lag_metrics(id.to_string(), instance_id.map(|x| x.to_string()), None)
122 }
123
124 pub fn for_instance(&self, id: StorageInstanceId) -> InstanceMetrics {
125 let connected_replica_count = self
126 .connected_replica_count
127 .get_delete_on_drop_metric(vec![id.to_string()]);
128
129 InstanceMetrics {
130 instance_id: id,
131 metrics: self.clone(),
132 connected_replica_count,
133 }
134 }
135}
136
137#[derive(Debug)]
138pub struct InstanceMetrics {
139 instance_id: StorageInstanceId,
140 metrics: StorageControllerMetrics,
141 pub connected_replica_count: UIntGauge,
143}
144
145impl InstanceMetrics {
146 pub fn for_replica(&self, id: ReplicaId) -> ReplicaMetrics {
147 let labels = vec![self.instance_id.to_string(), id.to_string()];
148 let extended_labels = |extra: &str| {
149 labels
150 .iter()
151 .cloned()
152 .chain([extra.into()])
153 .collect::<Vec<_>>()
154 };
155
156 ReplicaMetrics {
157 inner: Arc::new(ReplicaMetricsInner {
158 commands_total: CommandMetrics::build(|typ| {
159 let labels = extended_labels(typ);
160 self.metrics
161 .commands_total
162 .get_delete_on_drop_metric(labels)
163 }),
164 responses_total: ResponseMetrics::build(|typ| {
165 let labels = extended_labels(typ);
166 self.metrics
167 .responses_total
168 .get_delete_on_drop_metric(labels)
169 }),
170 command_message_bytes_total: self
171 .metrics
172 .command_message_bytes_total
173 .get_delete_on_drop_metric(labels.clone()),
174 response_message_bytes_total: self
175 .metrics
176 .response_message_bytes_total
177 .get_delete_on_drop_metric(labels.clone()),
178 replica_connects_total: self
179 .metrics
180 .replica_connects_total
181 .get_delete_on_drop_metric(labels.clone()),
182 replica_connect_wait_time_seconds_total: self
183 .metrics
184 .replica_connect_wait_time_seconds_total
185 .get_delete_on_drop_metric(labels),
186 }),
187 }
188 }
189
190 pub fn for_history(&self) -> HistoryMetrics {
191 let command_counts = CommandMetrics::build(|typ| {
192 let labels = vec![self.instance_id.to_string(), typ.to_string()];
193 self.metrics
194 .history_command_count
195 .get_delete_on_drop_metric(labels)
196 });
197
198 HistoryMetrics { command_counts }
199 }
200}
201
202#[derive(Debug)]
203struct ReplicaMetricsInner {
204 commands_total: CommandMetrics<IntCounter>,
205 command_message_bytes_total: IntCounter,
206 responses_total: ResponseMetrics<IntCounter>,
207 response_message_bytes_total: IntCounter,
208 replica_connects_total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
210 replica_connect_wait_time_seconds_total: DeleteOnDropCounter<AtomicF64, Vec<String>>,
212}
213
214#[derive(Debug, Clone)]
216pub struct ReplicaMetrics {
217 inner: Arc<ReplicaMetricsInner>,
218}
219
220impl ReplicaMetrics {
221 pub fn observe_connect(&self) {
223 self.inner.replica_connects_total.inc();
224 }
225
226 pub fn observe_connect_time(&self, wait_time: Duration) {
228 self.inner
229 .replica_connect_wait_time_seconds_total
230 .inc_by(wait_time.as_secs_f64());
231 }
232}
233
234impl<T> transport::Metrics<StorageCommand<T>, StorageResponse<T>> for ReplicaMetrics {
235 fn bytes_sent(&mut self, len: usize) {
236 self.inner
237 .command_message_bytes_total
238 .inc_by(u64::cast_from(len));
239 }
240
241 fn bytes_received(&mut self, len: usize) {
242 self.inner
243 .response_message_bytes_total
244 .inc_by(u64::cast_from(len));
245 }
246
247 fn message_sent(&mut self, msg: &StorageCommand<T>) {
248 self.inner.commands_total.for_command(msg).inc();
249 }
250
251 fn message_received(&mut self, msg: &StorageResponse<T>) {
252 self.inner.responses_total.for_response(msg).inc();
253 }
254}
255
256#[derive(Clone, Debug)]
258pub struct CommandMetrics<M> {
259 pub hello: M,
261 pub initialization_complete: M,
263 pub allow_writes: M,
265 pub update_configuration: M,
267 pub run_ingestion: M,
269 pub allow_compaction: M,
271 pub run_sink: M,
273 pub run_oneshot_ingestion: M,
275 pub cancel_oneshot_ingestion: M,
277}
278
279impl<M> CommandMetrics<M> {
280 fn build<F>(build_metric: F) -> Self
281 where
282 F: Fn(&str) -> M,
283 {
284 Self {
285 hello: build_metric("hello"),
286 initialization_complete: build_metric("initialization_complete"),
287 allow_writes: build_metric("allow_writes"),
288 update_configuration: build_metric("update_configuration"),
289 run_ingestion: build_metric("run_ingestion"),
290 allow_compaction: build_metric("allow_compaction"),
291 run_sink: build_metric("run_sink"),
292 run_oneshot_ingestion: build_metric("run_oneshot_ingestion"),
293 cancel_oneshot_ingestion: build_metric("cancel_oneshot_ingestion"),
294 }
295 }
296
297 fn for_all<F>(&self, f: F)
298 where
299 F: Fn(&M),
300 {
301 f(&self.hello);
302 f(&self.initialization_complete);
303 f(&self.allow_writes);
304 f(&self.update_configuration);
305 f(&self.run_ingestion);
306 f(&self.allow_compaction);
307 f(&self.run_sink);
308 f(&self.run_oneshot_ingestion);
309 f(&self.cancel_oneshot_ingestion);
310 }
311
312 pub fn for_command<T>(&self, command: &StorageCommand<T>) -> &M {
313 use StorageCommand::*;
314
315 match command {
316 Hello { .. } => &self.hello,
317 InitializationComplete => &self.initialization_complete,
318 AllowWrites => &self.allow_writes,
319 UpdateConfiguration(..) => &self.update_configuration,
320 RunIngestion(..) => &self.run_ingestion,
321 AllowCompaction(..) => &self.allow_compaction,
322 RunSink(..) => &self.run_sink,
323 RunOneshotIngestion(..) => &self.run_oneshot_ingestion,
324 CancelOneshotIngestion(..) => &self.cancel_oneshot_ingestion,
325 }
326 }
327}
328
329#[derive(Debug)]
331struct ResponseMetrics<M> {
332 frontier_upper: M,
333 dropped_id: M,
334 staged_batches: M,
335 statistics_updates: M,
336 status_update: M,
337}
338
339impl<M> ResponseMetrics<M> {
340 fn build<F>(build_metric: F) -> Self
341 where
342 F: Fn(&str) -> M,
343 {
344 Self {
345 frontier_upper: build_metric("frontier_upper"),
346 dropped_id: build_metric("dropped_id"),
347 staged_batches: build_metric("staged_batches"),
348 statistics_updates: build_metric("statistics_updates"),
349 status_update: build_metric("status_update"),
350 }
351 }
352
353 fn for_response<T>(&self, response: &StorageResponse<T>) -> &M {
354 use StorageResponse::*;
355
356 match response {
357 FrontierUpper(..) => &self.frontier_upper,
358 DroppedId(..) => &self.dropped_id,
359 StagedBatches(..) => &self.staged_batches,
360 StatisticsUpdates(..) => &self.statistics_updates,
361 StatusUpdate(..) => &self.status_update,
362 }
363 }
364}
365
366#[derive(Debug)]
368pub struct HistoryMetrics {
369 pub command_counts: CommandMetrics<UIntGauge>,
371}
372
373impl HistoryMetrics {
374 pub fn reset(&self) {
375 self.command_counts.for_all(|m| m.set(0));
376 }
377}