1use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24 IntCounterVec, MetricVecExt, MetricsRegistry, Rule,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42 commands_total: IntCounterVec,
44 command_message_bytes_total: IntCounterVec,
45 responses_total: IntCounterVec,
46 response_message_bytes_total: IntCounterVec,
47
48 replica_count: UIntGaugeVec,
50 collection_count: UIntGaugeVec,
51 collection_unscheduled_count: UIntGaugeVec,
52 peek_count: UIntGaugeVec,
53 subscribe_count: UIntGaugeVec,
54 copy_to_count: UIntGaugeVec,
55 command_queue_size: UIntGaugeVec,
56 response_send_count: IntCounterVec,
57 response_recv_count: IntCounterVec,
58 hydration_queue_size: UIntGaugeVec,
59
60 history_command_count: UIntGaugeVec,
62 history_dataflow_count: UIntGaugeVec,
63
64 peeks_total: IntCounterVec,
66 peek_duration_seconds: HistogramVec,
67
68 connected_replica_count: UIntGaugeVec,
70 replica_connects_total: IntCounterVec,
71 replica_connect_wait_time_seconds_total: CounterVec,
72
73 shared: ControllerMetrics,
75}
76
77fn instance_name_rule() -> Rule {
78 Rule::ClusterNameLookup {
79 cluster_id_label: "instance_id".into(),
80 output_label: "instance_name".into(),
81 }
82}
83
84fn replica_name_rule() -> Rule {
85 Rule::ReplicaNameLookup {
86 cluster_id_label: "instance_id".into(),
87 replica_id_label: "replica_id".into(),
88 output_label: "replica_name".into(),
89 }
90}
91
92impl ComputeControllerMetrics {
93 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
95 let metrics = ComputeControllerMetrics {
96 commands_total: metrics_registry.register(metric!(
97 name: "mz_compute_commands_total",
98 help: "The total number of compute commands sent.",
99 var_labels: ["instance_id", "replica_id", "command_type"],
100 rules: [instance_name_rule(), replica_name_rule()],
101 )),
102 command_message_bytes_total: metrics_registry.register(metric!(
103 name: "mz_compute_command_message_bytes_total",
104 help: "The total number of bytes sent in compute command messages.",
105 var_labels: ["instance_id", "replica_id"],
106 rules: [instance_name_rule(), replica_name_rule()],
107 )),
108 responses_total: metrics_registry.register(metric!(
109 name: "mz_compute_responses_total",
110 help: "The total number of compute responses sent.",
111 var_labels: ["instance_id", "replica_id", "response_type"],
112 rules: [instance_name_rule(), replica_name_rule()],
113 )),
114 response_message_bytes_total: metrics_registry.register(metric!(
115 name: "mz_compute_response_message_bytes_total",
116 help: "The total number of bytes sent in compute response messages.",
117 var_labels: ["instance_id", "replica_id"],
118 rules: [instance_name_rule(), replica_name_rule()],
119 )),
120 replica_count: metrics_registry.register(metric!(
121 name: "mz_compute_controller_replica_count",
122 help: "The number of replicas.",
123 var_labels: ["instance_id"],
124 rules: [instance_name_rule()],
125 )),
126 collection_count: metrics_registry.register(metric!(
127 name: "mz_compute_controller_collection_count",
128 help: "The number of installed compute collections.",
129 var_labels: ["instance_id"],
130 rules: [instance_name_rule()],
131 )),
132 collection_unscheduled_count: metrics_registry.register(metric!(
133 name: "mz_compute_controller_collection_unscheduled_count",
134 help: "The number of installed but unscheduled compute collections.",
135 var_labels: ["instance_id"],
136 rules: [instance_name_rule()],
137 )),
138 peek_count: metrics_registry.register(metric!(
139 name: "mz_compute_controller_peek_count",
140 help: "The number of pending peeks.",
141 var_labels: ["instance_id"],
142 rules: [instance_name_rule()],
143 )),
144 subscribe_count: metrics_registry.register(metric!(
145 name: "mz_compute_controller_subscribe_count",
146 help: "The number of active subscribes.",
147 var_labels: ["instance_id"],
148 rules: [instance_name_rule()],
149 )),
150 copy_to_count: metrics_registry.register(metric!(
151 name: "mz_compute_controller_copy_to_count",
152 help: "The number of active copy tos.",
153 var_labels: ["instance_id"],
154 rules: [instance_name_rule()],
155 )),
156 command_queue_size: metrics_registry.register(metric!(
157 name: "mz_compute_controller_command_queue_size",
158 help: "The size of the compute command queue.",
159 var_labels: ["instance_id", "replica_id"],
160 rules: [instance_name_rule(), replica_name_rule()],
161 )),
162 response_send_count: metrics_registry.register(metric!(
163 name: "mz_compute_controller_response_send_count",
164 help: "The number of sends on the compute response queue.",
165 var_labels: ["instance_id"],
166 rules: [instance_name_rule()],
167 )),
168 response_recv_count: metrics_registry.register(metric!(
169 name: "mz_compute_controller_response_recv_count",
170 help: "The number of receives on the compute response queue.",
171 var_labels: ["instance_id"],
172 rules: [instance_name_rule()],
173 )),
174 hydration_queue_size: metrics_registry.register(metric!(
175 name: "mz_compute_controller_hydration_queue_size",
176 help: "The size of the compute hydration queue.",
177 var_labels: ["instance_id", "replica_id"],
178 rules: [instance_name_rule(), replica_name_rule()],
179 )),
180 history_command_count: metrics_registry.register(metric!(
181 name: "mz_compute_controller_history_command_count",
182 help: "The number of commands in the controller's command history.",
183 var_labels: ["instance_id", "command_type"],
184 rules: [instance_name_rule()],
185 )),
186 history_dataflow_count: metrics_registry.register(metric!(
187 name: "mz_compute_controller_history_dataflow_count",
188 help: "The number of dataflows in the controller's command history.",
189 var_labels: ["instance_id"],
190 rules: [instance_name_rule()],
191 )),
192 peeks_total: metrics_registry.register(metric!(
193 name: "mz_compute_peeks_total",
194 help: "The total number of peeks served.",
195 var_labels: ["instance_id", "result"],
196 rules: [instance_name_rule()],
197 )),
198 peek_duration_seconds: metrics_registry.register(metric!(
199 name: "mz_compute_peek_duration_seconds",
200 help: "A histogram of peek durations since restart.",
201 var_labels: ["instance_id", "result"],
202 buckets: histogram_seconds_buckets(0.000_500, 32.),
203 rules: [instance_name_rule()],
204 )),
205 connected_replica_count: metrics_registry.register(metric!(
206 name: "mz_compute_controller_connected_replica_count",
207 help: "The number of replicas successfully connected to the compute controller.",
208 var_labels: ["instance_id"],
209 rules: [instance_name_rule()],
210 )),
211 replica_connects_total: metrics_registry.register(metric!(
212 name: "mz_compute_controller_replica_connects_total",
213 help: "The total number of replica (re-)connections made by the compute controller.",
214 var_labels: ["instance_id", "replica_id"],
215 rules: [instance_name_rule(), replica_name_rule()],
216 )),
217 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
218 name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
219 help: "The total time the compute controller spent waiting for replica (re-)connection.",
220 var_labels: ["instance_id", "replica_id"],
221 rules: [instance_name_rule(), replica_name_rule()],
222 )),
223
224 shared,
225 };
226
227 metrics
228 }
229
230 pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
232 let labels = vec![instance_id.to_string()];
233 let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
234 let collection_count = self
235 .collection_count
236 .get_delete_on_drop_metric(labels.clone());
237 let collection_unscheduled_count = self
238 .collection_unscheduled_count
239 .get_delete_on_drop_metric(labels.clone());
240 let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
241 let subscribe_count = self
242 .subscribe_count
243 .get_delete_on_drop_metric(labels.clone());
244 let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
245 let history_command_count = CommandMetrics::build(|typ| {
246 let labels = labels.iter().cloned().chain([typ.into()]).collect();
247 self.history_command_count.get_delete_on_drop_metric(labels)
248 });
249 let history_dataflow_count = self
250 .history_dataflow_count
251 .get_delete_on_drop_metric(labels.clone());
252 let peeks_total = PeekMetrics::build(|typ| {
253 let labels = labels.iter().cloned().chain([typ.into()]).collect();
254 self.peeks_total.get_delete_on_drop_metric(labels)
255 });
256 let peek_duration_seconds = PeekMetrics::build(|typ| {
257 let labels = labels.iter().cloned().chain([typ.into()]).collect();
258 self.peek_duration_seconds.get_delete_on_drop_metric(labels)
259 });
260 let response_send_count = self
261 .response_send_count
262 .get_delete_on_drop_metric(labels.clone());
263 let response_recv_count = self
264 .response_recv_count
265 .get_delete_on_drop_metric(labels.clone());
266 let connected_replica_count = self
267 .connected_replica_count
268 .get_delete_on_drop_metric(labels);
269
270 InstanceMetrics {
271 instance_id,
272 metrics: self.clone(),
273 replica_count,
274 collection_count,
275 collection_unscheduled_count,
276 copy_to_count,
277 peek_count,
278 subscribe_count,
279 history_command_count,
280 history_dataflow_count,
281 peeks_total,
282 peek_duration_seconds,
283 response_send_count,
284 response_recv_count,
285 connected_replica_count,
286 }
287 }
288}
289
290#[derive(Debug)]
292pub struct InstanceMetrics {
293 instance_id: ComputeInstanceId,
294 metrics: ComputeControllerMetrics,
295
296 pub replica_count: UIntGauge,
298 pub collection_count: UIntGauge,
300 pub collection_unscheduled_count: UIntGauge,
302 pub peek_count: UIntGauge,
304 pub subscribe_count: UIntGauge,
306 pub copy_to_count: UIntGauge,
308 pub history_command_count: CommandMetrics<UIntGauge>,
310 pub history_dataflow_count: UIntGauge,
312 pub peeks_total: PeekMetrics<IntCounter>,
314 pub peek_duration_seconds: PeekMetrics<Histogram>,
316 pub response_send_count: IntCounter,
318 pub response_recv_count: IntCounter,
320 pub connected_replica_count: UIntGauge,
322}
323
324impl InstanceMetrics {
325 pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
327 let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
328 let extended_labels = |extra: &str| {
329 labels
330 .iter()
331 .cloned()
332 .chain([extra.into()])
333 .collect::<Vec<_>>()
334 };
335
336 let commands_total = CommandMetrics::build(|typ| {
337 let labels = extended_labels(typ);
338 self.metrics
339 .commands_total
340 .get_delete_on_drop_metric(labels)
341 });
342 let responses_total = ResponseMetrics::build(|typ| {
343 let labels = extended_labels(typ);
344 self.metrics
345 .responses_total
346 .get_delete_on_drop_metric(labels)
347 });
348
349 let command_message_bytes_total = self
350 .metrics
351 .command_message_bytes_total
352 .get_delete_on_drop_metric(labels.clone());
353 let response_message_bytes_total = self
354 .metrics
355 .response_message_bytes_total
356 .get_delete_on_drop_metric(labels.clone());
357
358 let command_queue_size = self
359 .metrics
360 .command_queue_size
361 .get_delete_on_drop_metric(labels.clone());
362 let hydration_queue_size = self
363 .metrics
364 .hydration_queue_size
365 .get_delete_on_drop_metric(labels.clone());
366
367 let replica_connects_total = self
368 .metrics
369 .replica_connects_total
370 .get_delete_on_drop_metric(labels.clone());
371 let replica_connect_wait_time_seconds_total = self
372 .metrics
373 .replica_connect_wait_time_seconds_total
374 .get_delete_on_drop_metric(labels);
375
376 ReplicaMetrics {
377 instance_id: self.instance_id,
378 replica_id,
379 metrics: self.metrics.clone(),
380 inner: Arc::new(ReplicaMetricsInner {
381 commands_total,
382 command_message_bytes_total,
383 responses_total,
384 response_message_bytes_total,
385 command_queue_size,
386 hydration_queue_size,
387 replica_connects_total,
388 replica_connect_wait_time_seconds_total,
389 }),
390 }
391 }
392
393 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
395 let labels = vec![self.instance_id.to_string()];
396 let command_counts = CommandMetrics::build(|typ| {
397 let labels = labels.iter().cloned().chain([typ.into()]).collect();
398 self.metrics
399 .history_command_count
400 .get_delete_on_drop_metric(labels)
401 });
402 let dataflow_count = self
403 .metrics
404 .history_dataflow_count
405 .get_delete_on_drop_metric(labels);
406
407 HistoryMetrics {
408 command_counts,
409 dataflow_count,
410 }
411 }
412
413 pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
415 self.peeks_total.for_peek_response(response).inc();
416 self.peek_duration_seconds
417 .for_peek_response(response)
418 .observe(duration.as_secs_f64());
419 }
420}
421
422#[derive(Debug, Clone)]
424pub struct ReplicaMetrics {
425 instance_id: ComputeInstanceId,
426 replica_id: ReplicaId,
427 metrics: ComputeControllerMetrics,
428
429 pub inner: Arc<ReplicaMetricsInner>,
431}
432
433#[derive(Debug)]
435pub struct ReplicaMetricsInner {
436 commands_total: CommandMetrics<IntCounter>,
437 command_message_bytes_total: IntCounter,
438 responses_total: ResponseMetrics<IntCounter>,
439 response_message_bytes_total: IntCounter,
440
441 pub command_queue_size: UIntGauge,
443 pub hydration_queue_size: UIntGauge,
445
446 replica_connects_total: IntCounter,
448 replica_connect_wait_time_seconds_total: Counter,
450}
451
452impl ReplicaMetrics {
453 pub(crate) fn for_collection(
454 &self,
455 collection_id: GlobalId,
456 ) -> Option<ReplicaCollectionMetrics> {
457 if collection_id.is_transient() {
462 return None;
463 }
464
465 let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
466 collection_id.to_string(),
467 Some(self.instance_id.to_string()),
468 Some(self.replica_id.to_string()),
469 );
470
471 Some(ReplicaCollectionMetrics { wallclock_lag })
472 }
473
474 pub(crate) fn observe_connect(&self) {
476 self.inner.replica_connects_total.inc();
477 }
478
479 pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
481 self.inner
482 .replica_connect_wait_time_seconds_total
483 .inc_by(wait_time.as_secs_f64());
484 }
485}
486
487impl transport::Metrics<ComputeCommand, ComputeResponse> for ReplicaMetrics {
488 fn bytes_sent(&mut self, len: usize) {
489 self.inner
490 .command_message_bytes_total
491 .inc_by(u64::cast_from(len));
492 }
493
494 fn bytes_received(&mut self, len: usize) {
495 self.inner
496 .response_message_bytes_total
497 .inc_by(u64::cast_from(len));
498 }
499
500 fn message_sent(&mut self, msg: &ComputeCommand) {
501 self.inner.commands_total.for_command(msg).inc();
502 }
503
504 fn message_received(&mut self, msg: &ComputeResponse) {
505 self.inner.responses_total.for_response(msg).inc();
506 }
507}
508
509#[derive(Debug)]
511pub(crate) struct ReplicaCollectionMetrics {
512 pub wallclock_lag: WallclockLagMetrics,
514}
515
516#[derive(Clone, Debug)]
518pub struct CommandMetrics<M> {
519 pub hello: M,
521 pub create_instance: M,
523 pub create_dataflow: M,
525 pub schedule: M,
527 pub allow_compaction: M,
529 pub peek: M,
531 pub cancel_peek: M,
533 pub initialization_complete: M,
535 pub update_configuration: M,
537 pub allow_writes: M,
539}
540
541impl<M> CommandMetrics<M> {
542 pub fn build<F>(build_metric: F) -> Self
544 where
545 F: Fn(&str) -> M,
546 {
547 Self {
548 hello: build_metric("hello"),
549 create_instance: build_metric("create_instance"),
550 create_dataflow: build_metric("create_dataflow"),
551 schedule: build_metric("schedule"),
552 allow_compaction: build_metric("allow_compaction"),
553 peek: build_metric("peek"),
554 cancel_peek: build_metric("cancel_peek"),
555 initialization_complete: build_metric("initialization_complete"),
556 update_configuration: build_metric("update_configuration"),
557 allow_writes: build_metric("allow_writes"),
558 }
559 }
560
561 fn for_all<F>(&self, f: F)
562 where
563 F: Fn(&M),
564 {
565 f(&self.hello);
566 f(&self.create_instance);
567 f(&self.initialization_complete);
568 f(&self.update_configuration);
569 f(&self.create_dataflow);
570 f(&self.schedule);
571 f(&self.allow_compaction);
572 f(&self.peek);
573 f(&self.cancel_peek);
574 f(&self.allow_writes);
575 }
576
577 pub fn for_command(&self, command: &ComputeCommand) -> &M {
579 use ComputeCommand::*;
580
581 match command {
582 Hello { .. } => &self.hello,
583 CreateInstance(_) => &self.create_instance,
584 InitializationComplete => &self.initialization_complete,
585 UpdateConfiguration(_) => &self.update_configuration,
586 CreateDataflow(_) => &self.create_dataflow,
587 Schedule(_) => &self.schedule,
588 AllowCompaction { .. } => &self.allow_compaction,
589 Peek(_) => &self.peek,
590 CancelPeek { .. } => &self.cancel_peek,
591 AllowWrites { .. } => &self.allow_writes,
592 }
593 }
594}
595
596#[derive(Debug)]
598struct ResponseMetrics<M> {
599 frontiers: M,
600 peek_response: M,
601 subscribe_response: M,
602 copy_to_response: M,
603 status: M,
604}
605
606impl<M> ResponseMetrics<M> {
607 fn build<F>(build_metric: F) -> Self
608 where
609 F: Fn(&str) -> M,
610 {
611 Self {
612 frontiers: build_metric("frontiers"),
613 peek_response: build_metric("peek_response"),
614 subscribe_response: build_metric("subscribe_response"),
615 copy_to_response: build_metric("copy_to_response"),
616 status: build_metric("status"),
617 }
618 }
619
620 fn for_response(&self, response: &ComputeResponse) -> &M {
621 use ComputeResponse::*;
622
623 match response {
624 Frontiers(..) => &self.frontiers,
625 PeekResponse(..) => &self.peek_response,
626 SubscribeResponse(..) => &self.subscribe_response,
627 CopyToResponse(..) => &self.copy_to_response,
628 Status(..) => &self.status,
629 }
630 }
631}
632
633#[derive(Debug)]
635pub struct HistoryMetrics<G> {
636 pub command_counts: CommandMetrics<G>,
638 pub dataflow_count: G,
640}
641
642impl<G> HistoryMetrics<G>
643where
644 G: Borrow<mz_ore::metrics::UIntGauge>,
645{
646 pub fn reset(&self) {
648 self.command_counts.for_all(|m| m.borrow().set(0));
649 self.dataflow_count.borrow().set(0);
650 }
651}
652
653#[derive(Debug)]
655pub struct PeekMetrics<M> {
656 rows: M,
657 rows_stashed: M,
658 error: M,
659 canceled: M,
660}
661
662impl<M> PeekMetrics<M> {
663 fn build<F>(build_metric: F) -> Self
664 where
665 F: Fn(&str) -> M,
666 {
667 Self {
668 rows: build_metric("rows"),
669 rows_stashed: build_metric("rows_stashed"),
670 error: build_metric("error"),
671 canceled: build_metric("canceled"),
672 }
673 }
674
675 fn for_peek_response(&self, response: &PeekResponse) -> &M {
676 use PeekResponse::*;
677
678 match response {
679 Rows(_) => &self.rows,
680 Stashed(_) => &self.rows_stashed,
681 Error(_) => &self.error,
682 Canceled => &self.canceled,
683 }
684 }
685}