1use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24 IntCounterVec, MetricVecExt, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42 commands_total: IntCounterVec,
44 command_message_bytes_total: IntCounterVec,
45 responses_total: IntCounterVec,
46 response_message_bytes_total: IntCounterVec,
47
48 replica_count: UIntGaugeVec,
50 collection_count: UIntGaugeVec,
51 collection_unscheduled_count: UIntGaugeVec,
52 peek_count: UIntGaugeVec,
53 subscribe_count: UIntGaugeVec,
54 copy_to_count: UIntGaugeVec,
55 command_queue_size: UIntGaugeVec,
56 response_send_count: IntCounterVec,
57 response_recv_count: IntCounterVec,
58 hydration_queue_size: UIntGaugeVec,
59
60 history_command_count: UIntGaugeVec,
62 history_dataflow_count: UIntGaugeVec,
63
64 peeks_total: IntCounterVec,
66 peek_duration_seconds: HistogramVec,
67
68 connected_replica_count: UIntGaugeVec,
70 replica_connects_total: IntCounterVec,
71 replica_connect_wait_time_seconds_total: CounterVec,
72
73 shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80 ComputeControllerMetrics {
81 commands_total: metrics_registry.register(metric!(
82 name: "mz_compute_commands_total",
83 help: "The total number of compute commands sent.",
84 var_labels: ["instance_id", "replica_id", "command_type"],
85 )),
86 command_message_bytes_total: metrics_registry.register(metric!(
87 name: "mz_compute_command_message_bytes_total",
88 help: "The total number of bytes sent in compute command messages.",
89 var_labels: ["instance_id", "replica_id"],
90 )),
91 responses_total: metrics_registry.register(metric!(
92 name: "mz_compute_responses_total",
93 help: "The total number of compute responses sent.",
94 var_labels: ["instance_id", "replica_id", "response_type"],
95 )),
96 response_message_bytes_total: metrics_registry.register(metric!(
97 name: "mz_compute_response_message_bytes_total",
98 help: "The total number of bytes sent in compute response messages.",
99 var_labels: ["instance_id", "replica_id"],
100 )),
101 replica_count: metrics_registry.register(metric!(
102 name: "mz_compute_controller_replica_count",
103 help: "The number of replicas.",
104 var_labels: ["instance_id"],
105 )),
106 collection_count: metrics_registry.register(metric!(
107 name: "mz_compute_controller_collection_count",
108 help: "The number of installed compute collections.",
109 var_labels: ["instance_id"],
110 )),
111 collection_unscheduled_count: metrics_registry.register(metric!(
112 name: "mz_compute_controller_collection_unscheduled_count",
113 help: "The number of installed but unscheduled compute collections.",
114 var_labels: ["instance_id"],
115 )),
116 peek_count: metrics_registry.register(metric!(
117 name: "mz_compute_controller_peek_count",
118 help: "The number of pending peeks.",
119 var_labels: ["instance_id"],
120 )),
121 subscribe_count: metrics_registry.register(metric!(
122 name: "mz_compute_controller_subscribe_count",
123 help: "The number of active subscribes.",
124 var_labels: ["instance_id"],
125 )),
126 copy_to_count: metrics_registry.register(metric!(
127 name: "mz_compute_controller_copy_to_count",
128 help: "The number of active copy tos.",
129 var_labels: ["instance_id"],
130 )),
131 command_queue_size: metrics_registry.register(metric!(
132 name: "mz_compute_controller_command_queue_size",
133 help: "The size of the compute command queue.",
134 var_labels: ["instance_id", "replica_id"],
135 )),
136 response_send_count: metrics_registry.register(metric!(
137 name: "mz_compute_controller_response_send_count",
138 help: "The number of sends on the compute response queue.",
139 var_labels: ["instance_id"],
140 )),
141 response_recv_count: metrics_registry.register(metric!(
142 name: "mz_compute_controller_response_recv_count",
143 help: "The number of receives on the compute response queue.",
144 var_labels: ["instance_id"],
145 )),
146 hydration_queue_size: metrics_registry.register(metric!(
147 name: "mz_compute_controller_hydration_queue_size",
148 help: "The size of the compute hydration queue.",
149 var_labels: ["instance_id", "replica_id"],
150 )),
151 history_command_count: metrics_registry.register(metric!(
152 name: "mz_compute_controller_history_command_count",
153 help: "The number of commands in the controller's command history.",
154 var_labels: ["instance_id", "command_type"],
155 )),
156 history_dataflow_count: metrics_registry.register(metric!(
157 name: "mz_compute_controller_history_dataflow_count",
158 help: "The number of dataflows in the controller's command history.",
159 var_labels: ["instance_id"],
160 )),
161 peeks_total: metrics_registry.register(metric!(
162 name: "mz_compute_peeks_total",
163 help: "The total number of peeks served.",
164 var_labels: ["instance_id", "result"],
165 )),
166 peek_duration_seconds: metrics_registry.register(metric!(
167 name: "mz_compute_peek_duration_seconds",
168 help: "A histogram of peek durations since restart.",
169 var_labels: ["instance_id", "result"],
170 buckets: histogram_seconds_buckets(0.000_500, 32.),
171 )),
172 connected_replica_count: metrics_registry.register(metric!(
173 name: "mz_compute_controller_connected_replica_count",
174 help: "The number of replicas successfully connected to the compute controller.",
175 var_labels: ["instance_id"],
176 )),
177 replica_connects_total: metrics_registry.register(metric!(
178 name: "mz_compute_controller_replica_connects_total",
179 help: "The total number of replica (re-)connections made by the compute controller.",
180 var_labels: ["instance_id", "replica_id"],
181 )),
182 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
183 name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
184 help: "The total time the compute controller spent waiting for replica (re-)connection.",
185 var_labels: ["instance_id", "replica_id"],
186 )),
187
188 shared,
189 }
190 }
191
192 pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
194 let labels = vec![instance_id.to_string()];
195 let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
196 let collection_count = self
197 .collection_count
198 .get_delete_on_drop_metric(labels.clone());
199 let collection_unscheduled_count = self
200 .collection_unscheduled_count
201 .get_delete_on_drop_metric(labels.clone());
202 let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
203 let subscribe_count = self
204 .subscribe_count
205 .get_delete_on_drop_metric(labels.clone());
206 let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
207 let history_command_count = CommandMetrics::build(|typ| {
208 let labels = labels.iter().cloned().chain([typ.into()]).collect();
209 self.history_command_count.get_delete_on_drop_metric(labels)
210 });
211 let history_dataflow_count = self
212 .history_dataflow_count
213 .get_delete_on_drop_metric(labels.clone());
214 let peeks_total = PeekMetrics::build(|typ| {
215 let labels = labels.iter().cloned().chain([typ.into()]).collect();
216 self.peeks_total.get_delete_on_drop_metric(labels)
217 });
218 let peek_duration_seconds = PeekMetrics::build(|typ| {
219 let labels = labels.iter().cloned().chain([typ.into()]).collect();
220 self.peek_duration_seconds.get_delete_on_drop_metric(labels)
221 });
222 let response_send_count = self
223 .response_send_count
224 .get_delete_on_drop_metric(labels.clone());
225 let response_recv_count = self
226 .response_recv_count
227 .get_delete_on_drop_metric(labels.clone());
228 let connected_replica_count = self
229 .connected_replica_count
230 .get_delete_on_drop_metric(labels);
231
232 InstanceMetrics {
233 instance_id,
234 metrics: self.clone(),
235 replica_count,
236 collection_count,
237 collection_unscheduled_count,
238 copy_to_count,
239 peek_count,
240 subscribe_count,
241 history_command_count,
242 history_dataflow_count,
243 peeks_total,
244 peek_duration_seconds,
245 response_send_count,
246 response_recv_count,
247 connected_replica_count,
248 }
249 }
250}
251
252#[derive(Debug)]
254pub struct InstanceMetrics {
255 instance_id: ComputeInstanceId,
256 metrics: ComputeControllerMetrics,
257
258 pub replica_count: UIntGauge,
260 pub collection_count: UIntGauge,
262 pub collection_unscheduled_count: UIntGauge,
264 pub peek_count: UIntGauge,
266 pub subscribe_count: UIntGauge,
268 pub copy_to_count: UIntGauge,
270 pub history_command_count: CommandMetrics<UIntGauge>,
272 pub history_dataflow_count: UIntGauge,
274 pub peeks_total: PeekMetrics<IntCounter>,
276 pub peek_duration_seconds: PeekMetrics<Histogram>,
278 pub response_send_count: IntCounter,
280 pub response_recv_count: IntCounter,
282 pub connected_replica_count: UIntGauge,
284}
285
286impl InstanceMetrics {
287 pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
289 let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
290 let extended_labels = |extra: &str| {
291 labels
292 .iter()
293 .cloned()
294 .chain([extra.into()])
295 .collect::<Vec<_>>()
296 };
297
298 let commands_total = CommandMetrics::build(|typ| {
299 let labels = extended_labels(typ);
300 self.metrics
301 .commands_total
302 .get_delete_on_drop_metric(labels)
303 });
304 let responses_total = ResponseMetrics::build(|typ| {
305 let labels = extended_labels(typ);
306 self.metrics
307 .responses_total
308 .get_delete_on_drop_metric(labels)
309 });
310
311 let command_message_bytes_total = self
312 .metrics
313 .command_message_bytes_total
314 .get_delete_on_drop_metric(labels.clone());
315 let response_message_bytes_total = self
316 .metrics
317 .response_message_bytes_total
318 .get_delete_on_drop_metric(labels.clone());
319
320 let command_queue_size = self
321 .metrics
322 .command_queue_size
323 .get_delete_on_drop_metric(labels.clone());
324 let hydration_queue_size = self
325 .metrics
326 .hydration_queue_size
327 .get_delete_on_drop_metric(labels.clone());
328
329 let replica_connects_total = self
330 .metrics
331 .replica_connects_total
332 .get_delete_on_drop_metric(labels.clone());
333 let replica_connect_wait_time_seconds_total = self
334 .metrics
335 .replica_connect_wait_time_seconds_total
336 .get_delete_on_drop_metric(labels);
337
338 ReplicaMetrics {
339 instance_id: self.instance_id,
340 replica_id,
341 metrics: self.metrics.clone(),
342 inner: Arc::new(ReplicaMetricsInner {
343 commands_total,
344 command_message_bytes_total,
345 responses_total,
346 response_message_bytes_total,
347 command_queue_size,
348 hydration_queue_size,
349 replica_connects_total,
350 replica_connect_wait_time_seconds_total,
351 }),
352 }
353 }
354
355 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
357 let labels = vec![self.instance_id.to_string()];
358 let command_counts = CommandMetrics::build(|typ| {
359 let labels = labels.iter().cloned().chain([typ.into()]).collect();
360 self.metrics
361 .history_command_count
362 .get_delete_on_drop_metric(labels)
363 });
364 let dataflow_count = self
365 .metrics
366 .history_dataflow_count
367 .get_delete_on_drop_metric(labels);
368
369 HistoryMetrics {
370 command_counts,
371 dataflow_count,
372 }
373 }
374
375 pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
377 self.peeks_total.for_peek_response(response).inc();
378 self.peek_duration_seconds
379 .for_peek_response(response)
380 .observe(duration.as_secs_f64());
381 }
382}
383
384#[derive(Debug, Clone)]
386pub struct ReplicaMetrics {
387 instance_id: ComputeInstanceId,
388 replica_id: ReplicaId,
389 metrics: ComputeControllerMetrics,
390
391 pub inner: Arc<ReplicaMetricsInner>,
393}
394
395#[derive(Debug)]
397pub struct ReplicaMetricsInner {
398 commands_total: CommandMetrics<IntCounter>,
399 command_message_bytes_total: IntCounter,
400 responses_total: ResponseMetrics<IntCounter>,
401 response_message_bytes_total: IntCounter,
402
403 pub command_queue_size: UIntGauge,
405 pub hydration_queue_size: UIntGauge,
407
408 replica_connects_total: IntCounter,
410 replica_connect_wait_time_seconds_total: Counter,
412}
413
414impl ReplicaMetrics {
415 pub(crate) fn for_collection(
416 &self,
417 collection_id: GlobalId,
418 ) -> Option<ReplicaCollectionMetrics> {
419 if collection_id.is_transient() {
424 return None;
425 }
426
427 let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
428 collection_id.to_string(),
429 Some(self.instance_id.to_string()),
430 Some(self.replica_id.to_string()),
431 );
432
433 Some(ReplicaCollectionMetrics { wallclock_lag })
434 }
435
436 pub(crate) fn observe_connect(&self) {
438 self.inner.replica_connects_total.inc();
439 }
440
441 pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
443 self.inner
444 .replica_connect_wait_time_seconds_total
445 .inc_by(wait_time.as_secs_f64());
446 }
447}
448
449impl<T> transport::Metrics<ComputeCommand<T>, ComputeResponse<T>> for ReplicaMetrics {
450 fn bytes_sent(&mut self, len: usize) {
451 self.inner
452 .command_message_bytes_total
453 .inc_by(u64::cast_from(len));
454 }
455
456 fn bytes_received(&mut self, len: usize) {
457 self.inner
458 .response_message_bytes_total
459 .inc_by(u64::cast_from(len));
460 }
461
462 fn message_sent(&mut self, msg: &ComputeCommand<T>) {
463 self.inner.commands_total.for_command(msg).inc();
464 }
465
466 fn message_received(&mut self, msg: &ComputeResponse<T>) {
467 self.inner.responses_total.for_response(msg).inc();
468 }
469}
470
471#[derive(Debug)]
473pub(crate) struct ReplicaCollectionMetrics {
474 pub wallclock_lag: WallclockLagMetrics,
476}
477
478#[derive(Clone, Debug)]
480pub struct CommandMetrics<M> {
481 pub hello: M,
483 pub create_instance: M,
485 pub create_dataflow: M,
487 pub schedule: M,
489 pub allow_compaction: M,
491 pub peek: M,
493 pub cancel_peek: M,
495 pub initialization_complete: M,
497 pub update_configuration: M,
499 pub allow_writes: M,
501}
502
503impl<M> CommandMetrics<M> {
504 pub fn build<F>(build_metric: F) -> Self
506 where
507 F: Fn(&str) -> M,
508 {
509 Self {
510 hello: build_metric("hello"),
511 create_instance: build_metric("create_instance"),
512 create_dataflow: build_metric("create_dataflow"),
513 schedule: build_metric("schedule"),
514 allow_compaction: build_metric("allow_compaction"),
515 peek: build_metric("peek"),
516 cancel_peek: build_metric("cancel_peek"),
517 initialization_complete: build_metric("initialization_complete"),
518 update_configuration: build_metric("update_configuration"),
519 allow_writes: build_metric("allow_writes"),
520 }
521 }
522
523 fn for_all<F>(&self, f: F)
524 where
525 F: Fn(&M),
526 {
527 f(&self.hello);
528 f(&self.create_instance);
529 f(&self.initialization_complete);
530 f(&self.update_configuration);
531 f(&self.create_dataflow);
532 f(&self.schedule);
533 f(&self.allow_compaction);
534 f(&self.peek);
535 f(&self.cancel_peek);
536 f(&self.allow_writes);
537 }
538
539 pub fn for_command<T>(&self, command: &ComputeCommand<T>) -> &M {
541 use ComputeCommand::*;
542
543 match command {
544 Hello { .. } => &self.hello,
545 CreateInstance(_) => &self.create_instance,
546 InitializationComplete => &self.initialization_complete,
547 UpdateConfiguration(_) => &self.update_configuration,
548 CreateDataflow(_) => &self.create_dataflow,
549 Schedule(_) => &self.schedule,
550 AllowCompaction { .. } => &self.allow_compaction,
551 Peek(_) => &self.peek,
552 CancelPeek { .. } => &self.cancel_peek,
553 AllowWrites { .. } => &self.allow_writes,
554 }
555 }
556}
557
558#[derive(Debug)]
560struct ResponseMetrics<M> {
561 frontiers: M,
562 peek_response: M,
563 subscribe_response: M,
564 copy_to_response: M,
565 status: M,
566}
567
568impl<M> ResponseMetrics<M> {
569 fn build<F>(build_metric: F) -> Self
570 where
571 F: Fn(&str) -> M,
572 {
573 Self {
574 frontiers: build_metric("frontiers"),
575 peek_response: build_metric("peek_response"),
576 subscribe_response: build_metric("subscribe_response"),
577 copy_to_response: build_metric("copy_to_response"),
578 status: build_metric("status"),
579 }
580 }
581
582 fn for_response<T>(&self, response: &ComputeResponse<T>) -> &M {
583 use ComputeResponse::*;
584
585 match response {
586 Frontiers(..) => &self.frontiers,
587 PeekResponse(..) => &self.peek_response,
588 SubscribeResponse(..) => &self.subscribe_response,
589 CopyToResponse(..) => &self.copy_to_response,
590 Status(..) => &self.status,
591 }
592 }
593}
594
595#[derive(Debug)]
597pub struct HistoryMetrics<G> {
598 pub command_counts: CommandMetrics<G>,
600 pub dataflow_count: G,
602}
603
604impl<G> HistoryMetrics<G>
605where
606 G: Borrow<mz_ore::metrics::UIntGauge>,
607{
608 pub fn reset(&self) {
610 self.command_counts.for_all(|m| m.borrow().set(0));
611 self.dataflow_count.borrow().set(0);
612 }
613}
614
615#[derive(Debug)]
617pub struct PeekMetrics<M> {
618 rows: M,
619 rows_stashed: M,
620 error: M,
621 canceled: M,
622}
623
624impl<M> PeekMetrics<M> {
625 fn build<F>(build_metric: F) -> Self
626 where
627 F: Fn(&str) -> M,
628 {
629 Self {
630 rows: build_metric("rows"),
631 rows_stashed: build_metric("rows_stashed"),
632 error: build_metric("error"),
633 canceled: build_metric("canceled"),
634 }
635 }
636
637 fn for_peek_response(&self, response: &PeekResponse) -> &M {
638 use PeekResponse::*;
639
640 match response {
641 Rows(_) => &self.rows,
642 Stashed(_) => &self.rows_stashed,
643 Error(_) => &self.error,
644 Canceled => &self.canceled,
645 }
646 }
647}