1use std::sync::Arc;
13use std::time::Duration;
14
15use mz_cluster_client::ReplicaId;
16use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
17use mz_ore::cast::CastFrom;
18use mz_ore::metric;
19use mz_ore::metrics::{
20 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, MetricsRegistry,
21 UIntGaugeVec,
22};
23use mz_repr::GlobalId;
24use mz_service::codec::StatsCollector;
25use mz_service::transport;
26use mz_storage_types::instances::StorageInstanceId;
27use prometheus::core::{AtomicF64, AtomicU64};
28
29use crate::client::{ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse};
30
31type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
32pub type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
33
34#[derive(Debug, Clone)]
36pub struct StorageControllerMetrics {
37 commands_total: IntCounterVec,
39 command_message_bytes_total: IntCounterVec,
40 responses_total: IntCounterVec,
41 response_message_bytes_total: IntCounterVec,
42
43 regressed_offset_known: IntCounterVec,
44 history_command_count: UIntGaugeVec,
45
46 connected_replica_count: UIntGaugeVec,
48 replica_connects_total: IntCounterVec,
49 replica_connect_wait_time_seconds_total: CounterVec,
50
51 shared: ControllerMetrics,
53}
54
55impl StorageControllerMetrics {
56 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
57 Self {
58 commands_total: metrics_registry.register(metric!(
59 name: "mz_storage_commands_total",
60 help: "The total number of storage commands sent.",
61 var_labels: ["instance_id", "replica_id", "command_type"],
62 )),
63 command_message_bytes_total: metrics_registry.register(metric!(
64 name: "mz_storage_command_message_bytes_total",
65 help: "The total number of bytes sent in storage command messages.",
66 var_labels: ["instance_id", "replica_id"],
67 )),
68 responses_total: metrics_registry.register(metric!(
69 name: "mz_storage_responses_total",
70 help: "The total number of storage responses received.",
71 var_labels: ["instance_id", "replica_id", "response_type"],
72 )),
73 response_message_bytes_total: metrics_registry.register(metric!(
74 name: "mz_storage_response_message_bytes_total",
75 help: "The total number of bytes received in storage response messages.",
76 var_labels: ["instance_id", "replica_id"],
77 )),
78 regressed_offset_known: metrics_registry.register(metric!(
79 name: "mz_storage_regressed_offset_known",
80 help: "number of regressed offset_known stats for this id",
81 var_labels: ["id"],
82 )),
83 history_command_count: metrics_registry.register(metric!(
84 name: "mz_storage_controller_history_command_count",
85 help: "The number of commands in the controller's command history.",
86 var_labels: ["instance_id", "command_type"],
87 )),
88 connected_replica_count: metrics_registry.register(metric!(
89 name: "mz_storage_controller_connected_replica_count",
90 help: "The number of replicas successfully connected to the storage controller.",
91 var_labels: ["instance_id"],
92 )),
93 replica_connects_total: metrics_registry.register(metric!(
94 name: "mz_storage_controller_replica_connects_total",
95 help: "The total number of replica (re-)connections made by the storage controller.",
96 var_labels: ["instance_id", "replica_id"],
97 )),
98 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
99 name: "mz_storage_controller_replica_connect_wait_time_seconds_total",
100 help: "The total time the storage controller spent waiting for replica (re-)connection.",
101 var_labels: ["instance_id", "replica_id"],
102 )),
103
104 shared,
105 }
106 }
107
108 pub fn regressed_offset_known(
109 &self,
110 id: mz_repr::GlobalId,
111 ) -> DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>> {
112 self.regressed_offset_known
113 .get_delete_on_drop_metric(vec![id.to_string()])
114 }
115
116 pub fn wallclock_lag_metrics(
117 &self,
118 id: GlobalId,
119 instance_id: Option<StorageInstanceId>,
120 ) -> WallclockLagMetrics {
121 self.shared
122 .wallclock_lag_metrics(id.to_string(), instance_id.map(|x| x.to_string()), None)
123 }
124
125 pub fn for_instance(&self, id: StorageInstanceId) -> InstanceMetrics {
126 let connected_replica_count = self
127 .connected_replica_count
128 .get_delete_on_drop_metric(vec![id.to_string()]);
129
130 InstanceMetrics {
131 instance_id: id,
132 metrics: self.clone(),
133 connected_replica_count,
134 }
135 }
136}
137
138#[derive(Debug)]
139pub struct InstanceMetrics {
140 instance_id: StorageInstanceId,
141 metrics: StorageControllerMetrics,
142 pub connected_replica_count: UIntGauge,
144}
145
146impl InstanceMetrics {
147 pub fn for_replica(&self, id: ReplicaId) -> ReplicaMetrics {
148 let labels = vec![self.instance_id.to_string(), id.to_string()];
149 let extended_labels = |extra: &str| {
150 labels
151 .iter()
152 .cloned()
153 .chain([extra.into()])
154 .collect::<Vec<_>>()
155 };
156
157 ReplicaMetrics {
158 inner: Arc::new(ReplicaMetricsInner {
159 commands_total: CommandMetrics::build(|typ| {
160 let labels = extended_labels(typ);
161 self.metrics
162 .commands_total
163 .get_delete_on_drop_metric(labels)
164 }),
165 responses_total: ResponseMetrics::build(|typ| {
166 let labels = extended_labels(typ);
167 self.metrics
168 .responses_total
169 .get_delete_on_drop_metric(labels)
170 }),
171 command_message_bytes_total: self
172 .metrics
173 .command_message_bytes_total
174 .get_delete_on_drop_metric(labels.clone()),
175 response_message_bytes_total: self
176 .metrics
177 .response_message_bytes_total
178 .get_delete_on_drop_metric(labels.clone()),
179 replica_connects_total: self
180 .metrics
181 .replica_connects_total
182 .get_delete_on_drop_metric(labels.clone()),
183 replica_connect_wait_time_seconds_total: self
184 .metrics
185 .replica_connect_wait_time_seconds_total
186 .get_delete_on_drop_metric(labels),
187 }),
188 }
189 }
190
191 pub fn for_history(&self) -> HistoryMetrics {
192 let command_counts = CommandMetrics::build(|typ| {
193 let labels = vec![self.instance_id.to_string(), typ.to_string()];
194 self.metrics
195 .history_command_count
196 .get_delete_on_drop_metric(labels)
197 });
198
199 HistoryMetrics { command_counts }
200 }
201}
202
203#[derive(Debug)]
204struct ReplicaMetricsInner {
205 commands_total: CommandMetrics<IntCounter>,
206 command_message_bytes_total: IntCounter,
207 responses_total: ResponseMetrics<IntCounter>,
208 response_message_bytes_total: IntCounter,
209 replica_connects_total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
211 replica_connect_wait_time_seconds_total: DeleteOnDropCounter<AtomicF64, Vec<String>>,
213}
214
215#[derive(Debug, Clone)]
217pub struct ReplicaMetrics {
218 inner: Arc<ReplicaMetricsInner>,
219}
220
221impl ReplicaMetrics {
222 pub fn observe_connect(&self) {
224 self.inner.replica_connects_total.inc();
225 }
226
227 pub fn observe_connect_time(&self, wait_time: Duration) {
229 self.inner
230 .replica_connect_wait_time_seconds_total
231 .inc_by(wait_time.as_secs_f64());
232 }
233}
234
235impl StatsCollector<ProtoStorageCommand, ProtoStorageResponse> for ReplicaMetrics {
237 fn send_event(&self, item: &ProtoStorageCommand, size: usize) {
238 self.inner.commands_total.for_proto_command(item).inc();
239 self.inner
240 .command_message_bytes_total
241 .inc_by(u64::cast_from(size));
242 }
243
244 fn receive_event(&self, item: &ProtoStorageResponse, size: usize) {
245 self.inner.responses_total.for_proto_response(item).inc();
246 self.inner
247 .response_message_bytes_total
248 .inc_by(u64::cast_from(size));
249 }
250}
251
252impl<T> transport::Metrics<StorageCommand<T>, StorageResponse<T>> for ReplicaMetrics {
253 fn bytes_sent(&mut self, len: usize) {
254 self.inner
255 .command_message_bytes_total
256 .inc_by(u64::cast_from(len));
257 }
258
259 fn bytes_received(&mut self, len: usize) {
260 self.inner
261 .response_message_bytes_total
262 .inc_by(u64::cast_from(len));
263 }
264
265 fn message_sent(&mut self, msg: &StorageCommand<T>) {
266 self.inner.commands_total.for_command(msg).inc();
267 }
268
269 fn message_received(&mut self, msg: &StorageResponse<T>) {
270 self.inner.responses_total.for_response(msg).inc();
271 }
272}
273
274#[derive(Clone, Debug)]
276pub struct CommandMetrics<M> {
277 pub hello: M,
279 pub initialization_complete: M,
281 pub allow_writes: M,
283 pub update_configuration: M,
285 pub run_ingestion: M,
287 pub allow_compaction: M,
289 pub run_sink: M,
291 pub run_oneshot_ingestion: M,
293 pub cancel_oneshot_ingestion: M,
295}
296
297impl<M> CommandMetrics<M> {
298 fn build<F>(build_metric: F) -> Self
299 where
300 F: Fn(&str) -> M,
301 {
302 Self {
303 hello: build_metric("hello"),
304 initialization_complete: build_metric("initialization_complete"),
305 allow_writes: build_metric("allow_writes"),
306 update_configuration: build_metric("update_configuration"),
307 run_ingestion: build_metric("run_ingestion"),
308 allow_compaction: build_metric("allow_compaction"),
309 run_sink: build_metric("run_sink"),
310 run_oneshot_ingestion: build_metric("run_oneshot_ingestion"),
311 cancel_oneshot_ingestion: build_metric("cancel_oneshot_ingestion"),
312 }
313 }
314
315 fn for_all<F>(&self, f: F)
316 where
317 F: Fn(&M),
318 {
319 f(&self.hello);
320 f(&self.initialization_complete);
321 f(&self.allow_writes);
322 f(&self.update_configuration);
323 f(&self.run_ingestion);
324 f(&self.allow_compaction);
325 f(&self.run_sink);
326 f(&self.run_oneshot_ingestion);
327 f(&self.cancel_oneshot_ingestion);
328 }
329
330 pub fn for_command<T>(&self, command: &StorageCommand<T>) -> &M {
331 use StorageCommand::*;
332
333 match command {
334 Hello { .. } => &self.hello,
335 InitializationComplete => &self.initialization_complete,
336 AllowWrites => &self.allow_writes,
337 UpdateConfiguration(..) => &self.update_configuration,
338 RunIngestion(..) => &self.run_ingestion,
339 AllowCompaction(..) => &self.allow_compaction,
340 RunSink(..) => &self.run_sink,
341 RunOneshotIngestion(..) => &self.run_oneshot_ingestion,
342 CancelOneshotIngestion(..) => &self.cancel_oneshot_ingestion,
343 }
344 }
345
346 fn for_proto_command(&self, proto: &ProtoStorageCommand) -> &M {
347 use crate::client::proto_storage_command::Kind::*;
348
349 match proto.kind.as_ref().unwrap() {
350 Hello(..) => &self.hello,
351 AllowCompaction(..) => &self.allow_compaction,
352 InitializationComplete(..) => &self.initialization_complete,
353 UpdateConfiguration(..) => &self.update_configuration,
354 RunIngestion(..) => &self.run_ingestion,
355 AllowWrites(..) => &self.allow_writes,
356 RunSink(..) => &self.run_sink,
357 RunOneshotIngestion(..) => &self.run_oneshot_ingestion,
358 CancelOneshotIngestion(..) => &self.cancel_oneshot_ingestion,
359 }
360 }
361}
362
363#[derive(Debug)]
365struct ResponseMetrics<M> {
366 frontier_upper: M,
367 dropped_id: M,
368 staged_batches: M,
369 statistics_updates: M,
370 status_update: M,
371}
372
373impl<M> ResponseMetrics<M> {
374 fn build<F>(build_metric: F) -> Self
375 where
376 F: Fn(&str) -> M,
377 {
378 Self {
379 frontier_upper: build_metric("frontier_upper"),
380 dropped_id: build_metric("dropped_id"),
381 staged_batches: build_metric("staged_batches"),
382 statistics_updates: build_metric("statistics_updates"),
383 status_update: build_metric("status_update"),
384 }
385 }
386
387 fn for_response<T>(&self, response: &StorageResponse<T>) -> &M {
388 use StorageResponse::*;
389
390 match response {
391 FrontierUpper(..) => &self.frontier_upper,
392 DroppedId(..) => &self.dropped_id,
393 StagedBatches(..) => &self.staged_batches,
394 StatisticsUpdates(..) => &self.statistics_updates,
395 StatusUpdate(..) => &self.status_update,
396 }
397 }
398
399 fn for_proto_response(&self, proto: &ProtoStorageResponse) -> &M {
400 use crate::client::proto_storage_response::Kind::*;
401
402 match proto.kind.as_ref().unwrap() {
403 FrontierUpper(..) => &self.frontier_upper,
404 DroppedId(..) => &self.dropped_id,
405 Stats(..) => &self.statistics_updates,
406 StatusUpdate(..) => &self.status_update,
407 StagedBatches(..) => &self.staged_batches,
408 }
409 }
410}
411
412#[derive(Debug)]
414pub struct HistoryMetrics {
415 pub command_counts: CommandMetrics<UIntGauge>,
417}
418
419impl HistoryMetrics {
420 pub fn reset(&self) {
421 self.command_counts.for_all(|m| m.set(0));
422 }
423}