1use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24 IntCounterVec, MetricVecExt, MetricVisibility, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42 commands_total: IntCounterVec,
44 command_message_bytes_total: IntCounterVec,
45 responses_total: IntCounterVec,
46 response_message_bytes_total: IntCounterVec,
47
48 replica_count: UIntGaugeVec,
50 collection_count: UIntGaugeVec,
51 collection_unscheduled_count: UIntGaugeVec,
52 peek_count: UIntGaugeVec,
53 subscribe_count: UIntGaugeVec,
54 copy_to_count: UIntGaugeVec,
55 command_queue_size: UIntGaugeVec,
56 response_send_count: IntCounterVec,
57 response_recv_count: IntCounterVec,
58 hydration_queue_size: UIntGaugeVec,
59
60 history_command_count: UIntGaugeVec,
62 history_dataflow_count: UIntGaugeVec,
63
64 peeks_total: IntCounterVec,
66 peek_duration_seconds: HistogramVec,
67
68 connected_replica_count: UIntGaugeVec,
70 replica_connects_total: IntCounterVec,
71 replica_connect_wait_time_seconds_total: CounterVec,
72
73 shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80 ComputeControllerMetrics {
81 commands_total: metrics_registry.register(metric!(
82 name: "mz_compute_commands_total",
83 help: "The total number of compute commands sent.",
84 var_labels: ["instance_id", "replica_id", "command_type"],
85 visibility: MetricVisibility::Public,
86 )),
87 command_message_bytes_total: metrics_registry.register(metric!(
88 name: "mz_compute_command_message_bytes_total",
89 help: "The total number of bytes sent in compute command messages.",
90 var_labels: ["instance_id", "replica_id"],
91 )),
92 responses_total: metrics_registry.register(metric!(
93 name: "mz_compute_responses_total",
94 help: "The total number of compute responses sent.",
95 var_labels: ["instance_id", "replica_id", "response_type"],
96 )),
97 response_message_bytes_total: metrics_registry.register(metric!(
98 name: "mz_compute_response_message_bytes_total",
99 help: "The total number of bytes sent in compute response messages.",
100 var_labels: ["instance_id", "replica_id"],
101 )),
102 replica_count: metrics_registry.register(metric!(
103 name: "mz_compute_controller_replica_count",
104 help: "The number of replicas.",
105 var_labels: ["instance_id"],
106 )),
107 collection_count: metrics_registry.register(metric!(
108 name: "mz_compute_controller_collection_count",
109 help: "The number of installed compute collections.",
110 var_labels: ["instance_id"],
111 )),
112 collection_unscheduled_count: metrics_registry.register(metric!(
113 name: "mz_compute_controller_collection_unscheduled_count",
114 help: "The number of installed but unscheduled compute collections.",
115 var_labels: ["instance_id"],
116 )),
117 peek_count: metrics_registry.register(metric!(
118 name: "mz_compute_controller_peek_count",
119 help: "The number of pending peeks.",
120 var_labels: ["instance_id"],
121 )),
122 subscribe_count: metrics_registry.register(metric!(
123 name: "mz_compute_controller_subscribe_count",
124 help: "The number of active subscribes.",
125 var_labels: ["instance_id"],
126 )),
127 copy_to_count: metrics_registry.register(metric!(
128 name: "mz_compute_controller_copy_to_count",
129 help: "The number of active copy tos.",
130 var_labels: ["instance_id"],
131 )),
132 command_queue_size: metrics_registry.register(metric!(
133 name: "mz_compute_controller_command_queue_size",
134 help: "The size of the compute command queue.",
135 var_labels: ["instance_id", "replica_id"],
136 )),
137 response_send_count: metrics_registry.register(metric!(
138 name: "mz_compute_controller_response_send_count",
139 help: "The number of sends on the compute response queue.",
140 var_labels: ["instance_id"],
141 )),
142 response_recv_count: metrics_registry.register(metric!(
143 name: "mz_compute_controller_response_recv_count",
144 help: "The number of receives on the compute response queue.",
145 var_labels: ["instance_id"],
146 )),
147 hydration_queue_size: metrics_registry.register(metric!(
148 name: "mz_compute_controller_hydration_queue_size",
149 help: "The size of the compute hydration queue.",
150 var_labels: ["instance_id", "replica_id"],
151 visibility: MetricVisibility::Public,
152 )),
153 history_command_count: metrics_registry.register(metric!(
154 name: "mz_compute_controller_history_command_count",
155 help: "The number of commands in the controller's command history.",
156 var_labels: ["instance_id", "command_type"],
157 )),
158 history_dataflow_count: metrics_registry.register(metric!(
159 name: "mz_compute_controller_history_dataflow_count",
160 help: "The number of dataflows in the controller's command history.",
161 var_labels: ["instance_id"],
162 )),
163 peeks_total: metrics_registry.register(metric!(
164 name: "mz_compute_peeks_total",
165 help: "The total number of peeks served.",
166 var_labels: ["instance_id", "result"],
167 )),
168 peek_duration_seconds: metrics_registry.register(metric!(
169 name: "mz_compute_peek_duration_seconds",
170 help: "A histogram of peek durations since restart.",
171 var_labels: ["instance_id", "result"],
172 buckets: histogram_seconds_buckets(0.000_500, 32.),
173 visibility: MetricVisibility::Public,
174 )),
175 connected_replica_count: metrics_registry.register(metric!(
176 name: "mz_compute_controller_connected_replica_count",
177 help: "The number of replicas successfully connected to the compute controller.",
178 var_labels: ["instance_id"],
179 )),
180 replica_connects_total: metrics_registry.register(metric!(
181 name: "mz_compute_controller_replica_connects_total",
182 help: "The total number of replica (re-)connections made by the compute controller.",
183 var_labels: ["instance_id", "replica_id"],
184 )),
185 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
186 name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
187 help: "The total time the compute controller spent waiting for replica (re-)connection.",
188 var_labels: ["instance_id", "replica_id"],
189 )),
190
191 shared,
192 }
193 }
194
195 pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
197 let labels = vec![instance_id.to_string()];
198 let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
199 let collection_count = self
200 .collection_count
201 .get_delete_on_drop_metric(labels.clone());
202 let collection_unscheduled_count = self
203 .collection_unscheduled_count
204 .get_delete_on_drop_metric(labels.clone());
205 let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
206 let subscribe_count = self
207 .subscribe_count
208 .get_delete_on_drop_metric(labels.clone());
209 let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
210 let history_command_count = CommandMetrics::build(|typ| {
211 let labels = labels.iter().cloned().chain([typ.into()]).collect();
212 self.history_command_count.get_delete_on_drop_metric(labels)
213 });
214 let history_dataflow_count = self
215 .history_dataflow_count
216 .get_delete_on_drop_metric(labels.clone());
217 let peeks_total = PeekMetrics::build(|typ| {
218 let labels = labels.iter().cloned().chain([typ.into()]).collect();
219 self.peeks_total.get_delete_on_drop_metric(labels)
220 });
221 let peek_duration_seconds = PeekMetrics::build(|typ| {
222 let labels = labels.iter().cloned().chain([typ.into()]).collect();
223 self.peek_duration_seconds.get_delete_on_drop_metric(labels)
224 });
225 let response_send_count = self
226 .response_send_count
227 .get_delete_on_drop_metric(labels.clone());
228 let response_recv_count = self
229 .response_recv_count
230 .get_delete_on_drop_metric(labels.clone());
231 let connected_replica_count = self
232 .connected_replica_count
233 .get_delete_on_drop_metric(labels);
234
235 InstanceMetrics {
236 instance_id,
237 metrics: self.clone(),
238 replica_count,
239 collection_count,
240 collection_unscheduled_count,
241 copy_to_count,
242 peek_count,
243 subscribe_count,
244 history_command_count,
245 history_dataflow_count,
246 peeks_total,
247 peek_duration_seconds,
248 response_send_count,
249 response_recv_count,
250 connected_replica_count,
251 }
252 }
253}
254
255#[derive(Debug)]
257pub struct InstanceMetrics {
258 instance_id: ComputeInstanceId,
259 metrics: ComputeControllerMetrics,
260
261 pub replica_count: UIntGauge,
263 pub collection_count: UIntGauge,
265 pub collection_unscheduled_count: UIntGauge,
267 pub peek_count: UIntGauge,
269 pub subscribe_count: UIntGauge,
271 pub copy_to_count: UIntGauge,
273 pub history_command_count: CommandMetrics<UIntGauge>,
275 pub history_dataflow_count: UIntGauge,
277 pub peeks_total: PeekMetrics<IntCounter>,
279 pub peek_duration_seconds: PeekMetrics<Histogram>,
281 pub response_send_count: IntCounter,
283 pub response_recv_count: IntCounter,
285 pub connected_replica_count: UIntGauge,
287}
288
289impl InstanceMetrics {
290 pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
292 let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
293 let extended_labels = |extra: &str| {
294 labels
295 .iter()
296 .cloned()
297 .chain([extra.into()])
298 .collect::<Vec<_>>()
299 };
300
301 let commands_total = CommandMetrics::build(|typ| {
302 let labels = extended_labels(typ);
303 self.metrics
304 .commands_total
305 .get_delete_on_drop_metric(labels)
306 });
307 let responses_total = ResponseMetrics::build(|typ| {
308 let labels = extended_labels(typ);
309 self.metrics
310 .responses_total
311 .get_delete_on_drop_metric(labels)
312 });
313
314 let command_message_bytes_total = self
315 .metrics
316 .command_message_bytes_total
317 .get_delete_on_drop_metric(labels.clone());
318 let response_message_bytes_total = self
319 .metrics
320 .response_message_bytes_total
321 .get_delete_on_drop_metric(labels.clone());
322
323 let command_queue_size = self
324 .metrics
325 .command_queue_size
326 .get_delete_on_drop_metric(labels.clone());
327 let hydration_queue_size = self
328 .metrics
329 .hydration_queue_size
330 .get_delete_on_drop_metric(labels.clone());
331
332 let replica_connects_total = self
333 .metrics
334 .replica_connects_total
335 .get_delete_on_drop_metric(labels.clone());
336 let replica_connect_wait_time_seconds_total = self
337 .metrics
338 .replica_connect_wait_time_seconds_total
339 .get_delete_on_drop_metric(labels);
340
341 ReplicaMetrics {
342 instance_id: self.instance_id,
343 replica_id,
344 metrics: self.metrics.clone(),
345 inner: Arc::new(ReplicaMetricsInner {
346 commands_total,
347 command_message_bytes_total,
348 responses_total,
349 response_message_bytes_total,
350 command_queue_size,
351 hydration_queue_size,
352 replica_connects_total,
353 replica_connect_wait_time_seconds_total,
354 }),
355 }
356 }
357
358 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
360 let labels = vec![self.instance_id.to_string()];
361 let command_counts = CommandMetrics::build(|typ| {
362 let labels = labels.iter().cloned().chain([typ.into()]).collect();
363 self.metrics
364 .history_command_count
365 .get_delete_on_drop_metric(labels)
366 });
367 let dataflow_count = self
368 .metrics
369 .history_dataflow_count
370 .get_delete_on_drop_metric(labels);
371
372 HistoryMetrics {
373 command_counts,
374 dataflow_count,
375 }
376 }
377
378 pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
380 self.peeks_total.for_peek_response(response).inc();
381 self.peek_duration_seconds
382 .for_peek_response(response)
383 .observe(duration.as_secs_f64());
384 }
385}
386
387#[derive(Debug, Clone)]
389pub struct ReplicaMetrics {
390 instance_id: ComputeInstanceId,
391 replica_id: ReplicaId,
392 metrics: ComputeControllerMetrics,
393
394 pub inner: Arc<ReplicaMetricsInner>,
396}
397
398#[derive(Debug)]
400pub struct ReplicaMetricsInner {
401 commands_total: CommandMetrics<IntCounter>,
402 command_message_bytes_total: IntCounter,
403 responses_total: ResponseMetrics<IntCounter>,
404 response_message_bytes_total: IntCounter,
405
406 pub command_queue_size: UIntGauge,
408 pub hydration_queue_size: UIntGauge,
410
411 replica_connects_total: IntCounter,
413 replica_connect_wait_time_seconds_total: Counter,
415}
416
417impl ReplicaMetrics {
418 pub(crate) fn for_collection(
419 &self,
420 collection_id: GlobalId,
421 ) -> Option<ReplicaCollectionMetrics> {
422 if collection_id.is_transient() {
427 return None;
428 }
429
430 let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
431 collection_id.to_string(),
432 Some(self.instance_id.to_string()),
433 Some(self.replica_id.to_string()),
434 );
435
436 Some(ReplicaCollectionMetrics { wallclock_lag })
437 }
438
439 pub(crate) fn observe_connect(&self) {
441 self.inner.replica_connects_total.inc();
442 }
443
444 pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
446 self.inner
447 .replica_connect_wait_time_seconds_total
448 .inc_by(wait_time.as_secs_f64());
449 }
450}
451
452impl transport::Metrics<ComputeCommand, ComputeResponse> for ReplicaMetrics {
453 fn bytes_sent(&mut self, len: usize) {
454 self.inner
455 .command_message_bytes_total
456 .inc_by(u64::cast_from(len));
457 }
458
459 fn bytes_received(&mut self, len: usize) {
460 self.inner
461 .response_message_bytes_total
462 .inc_by(u64::cast_from(len));
463 }
464
465 fn message_sent(&mut self, msg: &ComputeCommand) {
466 self.inner.commands_total.for_command(msg).inc();
467 }
468
469 fn message_received(&mut self, msg: &ComputeResponse) {
470 self.inner.responses_total.for_response(msg).inc();
471 }
472}
473
474#[derive(Debug)]
476pub(crate) struct ReplicaCollectionMetrics {
477 pub wallclock_lag: WallclockLagMetrics,
479}
480
481#[derive(Clone, Debug)]
483pub struct CommandMetrics<M> {
484 pub hello: M,
486 pub create_instance: M,
488 pub create_dataflow: M,
490 pub schedule: M,
492 pub allow_compaction: M,
494 pub peek: M,
496 pub cancel_peek: M,
498 pub initialization_complete: M,
500 pub update_configuration: M,
502 pub allow_writes: M,
504}
505
506impl<M> CommandMetrics<M> {
507 pub fn build<F>(build_metric: F) -> Self
509 where
510 F: Fn(&str) -> M,
511 {
512 Self {
513 hello: build_metric("hello"),
514 create_instance: build_metric("create_instance"),
515 create_dataflow: build_metric("create_dataflow"),
516 schedule: build_metric("schedule"),
517 allow_compaction: build_metric("allow_compaction"),
518 peek: build_metric("peek"),
519 cancel_peek: build_metric("cancel_peek"),
520 initialization_complete: build_metric("initialization_complete"),
521 update_configuration: build_metric("update_configuration"),
522 allow_writes: build_metric("allow_writes"),
523 }
524 }
525
526 fn for_all<F>(&self, f: F)
527 where
528 F: Fn(&M),
529 {
530 f(&self.hello);
531 f(&self.create_instance);
532 f(&self.initialization_complete);
533 f(&self.update_configuration);
534 f(&self.create_dataflow);
535 f(&self.schedule);
536 f(&self.allow_compaction);
537 f(&self.peek);
538 f(&self.cancel_peek);
539 f(&self.allow_writes);
540 }
541
542 pub fn for_command(&self, command: &ComputeCommand) -> &M {
544 use ComputeCommand::*;
545
546 match command {
547 Hello { .. } => &self.hello,
548 CreateInstance(_) => &self.create_instance,
549 InitializationComplete => &self.initialization_complete,
550 UpdateConfiguration(_) => &self.update_configuration,
551 CreateDataflow(_) => &self.create_dataflow,
552 Schedule(_) => &self.schedule,
553 AllowCompaction { .. } => &self.allow_compaction,
554 Peek(_) => &self.peek,
555 CancelPeek { .. } => &self.cancel_peek,
556 AllowWrites { .. } => &self.allow_writes,
557 }
558 }
559}
560
561#[derive(Debug)]
563struct ResponseMetrics<M> {
564 frontiers: M,
565 peek_response: M,
566 subscribe_response: M,
567 copy_to_response: M,
568 status: M,
569}
570
571impl<M> ResponseMetrics<M> {
572 fn build<F>(build_metric: F) -> Self
573 where
574 F: Fn(&str) -> M,
575 {
576 Self {
577 frontiers: build_metric("frontiers"),
578 peek_response: build_metric("peek_response"),
579 subscribe_response: build_metric("subscribe_response"),
580 copy_to_response: build_metric("copy_to_response"),
581 status: build_metric("status"),
582 }
583 }
584
585 fn for_response(&self, response: &ComputeResponse) -> &M {
586 use ComputeResponse::*;
587
588 match response {
589 Frontiers(..) => &self.frontiers,
590 PeekResponse(..) => &self.peek_response,
591 SubscribeResponse(..) => &self.subscribe_response,
592 CopyToResponse(..) => &self.copy_to_response,
593 Status(..) => &self.status,
594 }
595 }
596}
597
598#[derive(Debug)]
600pub struct HistoryMetrics<G> {
601 pub command_counts: CommandMetrics<G>,
603 pub dataflow_count: G,
605}
606
607impl<G> HistoryMetrics<G>
608where
609 G: Borrow<mz_ore::metrics::UIntGauge>,
610{
611 pub fn reset(&self) {
613 self.command_counts.for_all(|m| m.borrow().set(0));
614 self.dataflow_count.borrow().set(0);
615 }
616}
617
618#[derive(Debug)]
620pub struct PeekMetrics<M> {
621 rows: M,
622 rows_stashed: M,
623 error: M,
624 canceled: M,
625}
626
627impl<M> PeekMetrics<M> {
628 fn build<F>(build_metric: F) -> Self
629 where
630 F: Fn(&str) -> M,
631 {
632 Self {
633 rows: build_metric("rows"),
634 rows_stashed: build_metric("rows_stashed"),
635 error: build_metric("error"),
636 canceled: build_metric("canceled"),
637 }
638 }
639
640 fn for_peek_response(&self, response: &PeekResponse) -> &M {
641 use PeekResponse::*;
642
643 match response {
644 Rows(_) => &self.rows,
645 Stashed(_) => &self.rows_stashed,
646 Error(_) => &self.error,
647 Canceled => &self.canceled,
648 }
649 }
650}