1use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24 IntCounterVec, MetricTag, MetricVecExt, MetricVisibility, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42 commands_total: IntCounterVec,
44 command_message_bytes_total: IntCounterVec,
45 responses_total: IntCounterVec,
46 response_message_bytes_total: IntCounterVec,
47
48 replica_count: UIntGaugeVec,
50 collection_count: UIntGaugeVec,
51 collection_unscheduled_count: UIntGaugeVec,
52 peek_count: UIntGaugeVec,
53 subscribe_count: UIntGaugeVec,
54 copy_to_count: UIntGaugeVec,
55 command_queue_size: UIntGaugeVec,
56 response_send_count: IntCounterVec,
57 response_recv_count: IntCounterVec,
58 hydration_queue_size: UIntGaugeVec,
59
60 history_command_count: UIntGaugeVec,
62 history_dataflow_count: UIntGaugeVec,
63
64 peeks_total: IntCounterVec,
66 peek_duration_seconds: HistogramVec,
67
68 connected_replica_count: UIntGaugeVec,
70 replica_connects_total: IntCounterVec,
71 replica_connect_wait_time_seconds_total: CounterVec,
72
73 shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80 ComputeControllerMetrics {
81 commands_total: metrics_registry.register(metric!(
82 name: "mz_compute_commands_total",
83 help: "The total number of compute commands sent.",
84 var_labels: ["instance_id", "replica_id", "command_type"],
85 visibility: MetricVisibility::Public,
86 tags: [MetricTag::Compute],
87 )),
88 command_message_bytes_total: metrics_registry.register(metric!(
89 name: "mz_compute_command_message_bytes_total",
90 help: "The total number of bytes sent in compute command messages.",
91 var_labels: ["instance_id", "replica_id"],
92 )),
93 responses_total: metrics_registry.register(metric!(
94 name: "mz_compute_responses_total",
95 help: "The total number of compute responses sent.",
96 var_labels: ["instance_id", "replica_id", "response_type"],
97 )),
98 response_message_bytes_total: metrics_registry.register(metric!(
99 name: "mz_compute_response_message_bytes_total",
100 help: "The total number of bytes sent in compute response messages.",
101 var_labels: ["instance_id", "replica_id"],
102 )),
103 replica_count: metrics_registry.register(metric!(
104 name: "mz_compute_controller_replica_count",
105 help: "The number of replicas.",
106 var_labels: ["instance_id"],
107 )),
108 collection_count: metrics_registry.register(metric!(
109 name: "mz_compute_controller_collection_count",
110 help: "The number of installed compute collections.",
111 var_labels: ["instance_id"],
112 )),
113 collection_unscheduled_count: metrics_registry.register(metric!(
114 name: "mz_compute_controller_collection_unscheduled_count",
115 help: "The number of installed but unscheduled compute collections.",
116 var_labels: ["instance_id"],
117 )),
118 peek_count: metrics_registry.register(metric!(
119 name: "mz_compute_controller_peek_count",
120 help: "The number of pending peeks.",
121 var_labels: ["instance_id"],
122 )),
123 subscribe_count: metrics_registry.register(metric!(
124 name: "mz_compute_controller_subscribe_count",
125 help: "The number of active subscribes.",
126 var_labels: ["instance_id"],
127 )),
128 copy_to_count: metrics_registry.register(metric!(
129 name: "mz_compute_controller_copy_to_count",
130 help: "The number of active copy tos.",
131 var_labels: ["instance_id"],
132 )),
133 command_queue_size: metrics_registry.register(metric!(
134 name: "mz_compute_controller_command_queue_size",
135 help: "The size of the compute command queue.",
136 var_labels: ["instance_id", "replica_id"],
137 )),
138 response_send_count: metrics_registry.register(metric!(
139 name: "mz_compute_controller_response_send_count",
140 help: "The number of sends on the compute response queue.",
141 var_labels: ["instance_id"],
142 )),
143 response_recv_count: metrics_registry.register(metric!(
144 name: "mz_compute_controller_response_recv_count",
145 help: "The number of receives on the compute response queue.",
146 var_labels: ["instance_id"],
147 )),
148 hydration_queue_size: metrics_registry.register(metric!(
149 name: "mz_compute_controller_hydration_queue_size",
150 help: "The size of the compute hydration queue.",
151 var_labels: ["instance_id", "replica_id"],
152 visibility: MetricVisibility::Public,
153 tags: [MetricTag::Compute],
154 )),
155 history_command_count: metrics_registry.register(metric!(
156 name: "mz_compute_controller_history_command_count",
157 help: "The number of commands in the controller's command history.",
158 var_labels: ["instance_id", "command_type"],
159 )),
160 history_dataflow_count: metrics_registry.register(metric!(
161 name: "mz_compute_controller_history_dataflow_count",
162 help: "The number of dataflows in the controller's command history.",
163 var_labels: ["instance_id"],
164 )),
165 peeks_total: metrics_registry.register(metric!(
166 name: "mz_compute_peeks_total",
167 help: "The total number of peeks served.",
168 var_labels: ["instance_id", "result"],
169 )),
170 peek_duration_seconds: metrics_registry.register(metric!(
171 name: "mz_compute_peek_duration_seconds",
172 help: "A histogram of peek durations since restart.",
173 var_labels: ["instance_id", "result"],
174 buckets: histogram_seconds_buckets(0.000_500, 32.),
175 visibility: MetricVisibility::Public,
176 tags: [MetricTag::Compute],
177 )),
178 connected_replica_count: metrics_registry.register(metric!(
179 name: "mz_compute_controller_connected_replica_count",
180 help: "The number of replicas successfully connected to the compute controller.",
181 var_labels: ["instance_id"],
182 )),
183 replica_connects_total: metrics_registry.register(metric!(
184 name: "mz_compute_controller_replica_connects_total",
185 help: "The total number of replica (re-)connections made by the compute controller.",
186 var_labels: ["instance_id", "replica_id"],
187 )),
188 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
189 name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
190 help: "The total time the compute controller spent waiting for replica (re-)connection.",
191 var_labels: ["instance_id", "replica_id"],
192 )),
193
194 shared,
195 }
196 }
197
198 pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
200 let labels = vec![instance_id.to_string()];
201 let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
202 let collection_count = self
203 .collection_count
204 .get_delete_on_drop_metric(labels.clone());
205 let collection_unscheduled_count = self
206 .collection_unscheduled_count
207 .get_delete_on_drop_metric(labels.clone());
208 let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
209 let subscribe_count = self
210 .subscribe_count
211 .get_delete_on_drop_metric(labels.clone());
212 let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
213 let history_command_count = CommandMetrics::build(|typ| {
214 let labels = labels.iter().cloned().chain([typ.into()]).collect();
215 self.history_command_count.get_delete_on_drop_metric(labels)
216 });
217 let history_dataflow_count = self
218 .history_dataflow_count
219 .get_delete_on_drop_metric(labels.clone());
220 let peeks_total = PeekMetrics::build(|typ| {
221 let labels = labels.iter().cloned().chain([typ.into()]).collect();
222 self.peeks_total.get_delete_on_drop_metric(labels)
223 });
224 let peek_duration_seconds = PeekMetrics::build(|typ| {
225 let labels = labels.iter().cloned().chain([typ.into()]).collect();
226 self.peek_duration_seconds.get_delete_on_drop_metric(labels)
227 });
228 let response_send_count = self
229 .response_send_count
230 .get_delete_on_drop_metric(labels.clone());
231 let response_recv_count = self
232 .response_recv_count
233 .get_delete_on_drop_metric(labels.clone());
234 let connected_replica_count = self
235 .connected_replica_count
236 .get_delete_on_drop_metric(labels);
237
238 InstanceMetrics {
239 instance_id,
240 metrics: self.clone(),
241 replica_count,
242 collection_count,
243 collection_unscheduled_count,
244 copy_to_count,
245 peek_count,
246 subscribe_count,
247 history_command_count,
248 history_dataflow_count,
249 peeks_total,
250 peek_duration_seconds,
251 response_send_count,
252 response_recv_count,
253 connected_replica_count,
254 }
255 }
256}
257
258#[derive(Debug)]
260pub struct InstanceMetrics {
261 instance_id: ComputeInstanceId,
262 metrics: ComputeControllerMetrics,
263
264 pub replica_count: UIntGauge,
266 pub collection_count: UIntGauge,
268 pub collection_unscheduled_count: UIntGauge,
270 pub peek_count: UIntGauge,
272 pub subscribe_count: UIntGauge,
274 pub copy_to_count: UIntGauge,
276 pub history_command_count: CommandMetrics<UIntGauge>,
278 pub history_dataflow_count: UIntGauge,
280 pub peeks_total: PeekMetrics<IntCounter>,
282 pub peek_duration_seconds: PeekMetrics<Histogram>,
284 pub response_send_count: IntCounter,
286 pub response_recv_count: IntCounter,
288 pub connected_replica_count: UIntGauge,
290}
291
292impl InstanceMetrics {
293 pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
295 let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
296 let extended_labels = |extra: &str| {
297 labels
298 .iter()
299 .cloned()
300 .chain([extra.into()])
301 .collect::<Vec<_>>()
302 };
303
304 let commands_total = CommandMetrics::build(|typ| {
305 let labels = extended_labels(typ);
306 self.metrics
307 .commands_total
308 .get_delete_on_drop_metric(labels)
309 });
310 let responses_total = ResponseMetrics::build(|typ| {
311 let labels = extended_labels(typ);
312 self.metrics
313 .responses_total
314 .get_delete_on_drop_metric(labels)
315 });
316
317 let command_message_bytes_total = self
318 .metrics
319 .command_message_bytes_total
320 .get_delete_on_drop_metric(labels.clone());
321 let response_message_bytes_total = self
322 .metrics
323 .response_message_bytes_total
324 .get_delete_on_drop_metric(labels.clone());
325
326 let command_queue_size = self
327 .metrics
328 .command_queue_size
329 .get_delete_on_drop_metric(labels.clone());
330 let hydration_queue_size = self
331 .metrics
332 .hydration_queue_size
333 .get_delete_on_drop_metric(labels.clone());
334
335 let replica_connects_total = self
336 .metrics
337 .replica_connects_total
338 .get_delete_on_drop_metric(labels.clone());
339 let replica_connect_wait_time_seconds_total = self
340 .metrics
341 .replica_connect_wait_time_seconds_total
342 .get_delete_on_drop_metric(labels);
343
344 ReplicaMetrics {
345 instance_id: self.instance_id,
346 replica_id,
347 metrics: self.metrics.clone(),
348 inner: Arc::new(ReplicaMetricsInner {
349 commands_total,
350 command_message_bytes_total,
351 responses_total,
352 response_message_bytes_total,
353 command_queue_size,
354 hydration_queue_size,
355 replica_connects_total,
356 replica_connect_wait_time_seconds_total,
357 }),
358 }
359 }
360
361 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
363 let labels = vec![self.instance_id.to_string()];
364 let command_counts = CommandMetrics::build(|typ| {
365 let labels = labels.iter().cloned().chain([typ.into()]).collect();
366 self.metrics
367 .history_command_count
368 .get_delete_on_drop_metric(labels)
369 });
370 let dataflow_count = self
371 .metrics
372 .history_dataflow_count
373 .get_delete_on_drop_metric(labels);
374
375 HistoryMetrics {
376 command_counts,
377 dataflow_count,
378 }
379 }
380
381 pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
383 self.peeks_total.for_peek_response(response).inc();
384 self.peek_duration_seconds
385 .for_peek_response(response)
386 .observe(duration.as_secs_f64());
387 }
388}
389
390#[derive(Debug, Clone)]
392pub struct ReplicaMetrics {
393 instance_id: ComputeInstanceId,
394 replica_id: ReplicaId,
395 metrics: ComputeControllerMetrics,
396
397 pub inner: Arc<ReplicaMetricsInner>,
399}
400
401#[derive(Debug)]
403pub struct ReplicaMetricsInner {
404 commands_total: CommandMetrics<IntCounter>,
405 command_message_bytes_total: IntCounter,
406 responses_total: ResponseMetrics<IntCounter>,
407 response_message_bytes_total: IntCounter,
408
409 pub command_queue_size: UIntGauge,
411 pub hydration_queue_size: UIntGauge,
413
414 replica_connects_total: IntCounter,
416 replica_connect_wait_time_seconds_total: Counter,
418}
419
420impl ReplicaMetrics {
421 pub(crate) fn for_collection(
422 &self,
423 collection_id: GlobalId,
424 ) -> Option<ReplicaCollectionMetrics> {
425 if collection_id.is_transient() {
430 return None;
431 }
432
433 let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
434 collection_id.to_string(),
435 Some(self.instance_id.to_string()),
436 Some(self.replica_id.to_string()),
437 );
438
439 Some(ReplicaCollectionMetrics { wallclock_lag })
440 }
441
442 pub(crate) fn observe_connect(&self) {
444 self.inner.replica_connects_total.inc();
445 }
446
447 pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
449 self.inner
450 .replica_connect_wait_time_seconds_total
451 .inc_by(wait_time.as_secs_f64());
452 }
453}
454
455impl transport::Metrics<ComputeCommand, ComputeResponse> for ReplicaMetrics {
456 fn bytes_sent(&mut self, len: usize) {
457 self.inner
458 .command_message_bytes_total
459 .inc_by(u64::cast_from(len));
460 }
461
462 fn bytes_received(&mut self, len: usize) {
463 self.inner
464 .response_message_bytes_total
465 .inc_by(u64::cast_from(len));
466 }
467
468 fn message_sent(&mut self, msg: &ComputeCommand) {
469 self.inner.commands_total.for_command(msg).inc();
470 }
471
472 fn message_received(&mut self, msg: &ComputeResponse) {
473 self.inner.responses_total.for_response(msg).inc();
474 }
475}
476
477#[derive(Debug)]
479pub(crate) struct ReplicaCollectionMetrics {
480 pub wallclock_lag: WallclockLagMetrics,
482}
483
484#[derive(Clone, Debug)]
486pub struct CommandMetrics<M> {
487 pub hello: M,
489 pub create_instance: M,
491 pub create_dataflow: M,
493 pub schedule: M,
495 pub allow_compaction: M,
497 pub peek: M,
499 pub cancel_peek: M,
501 pub initialization_complete: M,
503 pub update_configuration: M,
505 pub allow_writes: M,
507}
508
509impl<M> CommandMetrics<M> {
510 pub fn build<F>(build_metric: F) -> Self
512 where
513 F: Fn(&str) -> M,
514 {
515 Self {
516 hello: build_metric("hello"),
517 create_instance: build_metric("create_instance"),
518 create_dataflow: build_metric("create_dataflow"),
519 schedule: build_metric("schedule"),
520 allow_compaction: build_metric("allow_compaction"),
521 peek: build_metric("peek"),
522 cancel_peek: build_metric("cancel_peek"),
523 initialization_complete: build_metric("initialization_complete"),
524 update_configuration: build_metric("update_configuration"),
525 allow_writes: build_metric("allow_writes"),
526 }
527 }
528
529 fn for_all<F>(&self, f: F)
530 where
531 F: Fn(&M),
532 {
533 f(&self.hello);
534 f(&self.create_instance);
535 f(&self.initialization_complete);
536 f(&self.update_configuration);
537 f(&self.create_dataflow);
538 f(&self.schedule);
539 f(&self.allow_compaction);
540 f(&self.peek);
541 f(&self.cancel_peek);
542 f(&self.allow_writes);
543 }
544
545 pub fn for_command(&self, command: &ComputeCommand) -> &M {
547 use ComputeCommand::*;
548
549 match command {
550 Hello { .. } => &self.hello,
551 CreateInstance(_) => &self.create_instance,
552 InitializationComplete => &self.initialization_complete,
553 UpdateConfiguration(_) => &self.update_configuration,
554 CreateDataflow(_) => &self.create_dataflow,
555 Schedule(_) => &self.schedule,
556 AllowCompaction { .. } => &self.allow_compaction,
557 Peek(_) => &self.peek,
558 CancelPeek { .. } => &self.cancel_peek,
559 AllowWrites { .. } => &self.allow_writes,
560 }
561 }
562}
563
564#[derive(Debug)]
566struct ResponseMetrics<M> {
567 frontiers: M,
568 peek_response: M,
569 subscribe_response: M,
570 copy_to_response: M,
571 status: M,
572}
573
574impl<M> ResponseMetrics<M> {
575 fn build<F>(build_metric: F) -> Self
576 where
577 F: Fn(&str) -> M,
578 {
579 Self {
580 frontiers: build_metric("frontiers"),
581 peek_response: build_metric("peek_response"),
582 subscribe_response: build_metric("subscribe_response"),
583 copy_to_response: build_metric("copy_to_response"),
584 status: build_metric("status"),
585 }
586 }
587
588 fn for_response(&self, response: &ComputeResponse) -> &M {
589 use ComputeResponse::*;
590
591 match response {
592 Frontiers(..) => &self.frontiers,
593 PeekResponse(..) => &self.peek_response,
594 SubscribeResponse(..) => &self.subscribe_response,
595 CopyToResponse(..) => &self.copy_to_response,
596 Status(..) => &self.status,
597 }
598 }
599}
600
601#[derive(Debug)]
603pub struct HistoryMetrics<G> {
604 pub command_counts: CommandMetrics<G>,
606 pub dataflow_count: G,
608}
609
610impl<G> HistoryMetrics<G>
611where
612 G: Borrow<mz_ore::metrics::UIntGauge>,
613{
614 pub fn reset(&self) {
616 self.command_counts.for_all(|m| m.borrow().set(0));
617 self.dataflow_count.borrow().set(0);
618 }
619}
620
621#[derive(Debug)]
623pub struct PeekMetrics<M> {
624 rows: M,
625 rows_stashed: M,
626 error: M,
627 canceled: M,
628}
629
630impl<M> PeekMetrics<M> {
631 fn build<F>(build_metric: F) -> Self
632 where
633 F: Fn(&str) -> M,
634 {
635 Self {
636 rows: build_metric("rows"),
637 rows_stashed: build_metric("rows_stashed"),
638 error: build_metric("error"),
639 canceled: build_metric("canceled"),
640 }
641 }
642
643 fn for_peek_response(&self, response: &PeekResponse) -> &M {
644 use PeekResponse::*;
645
646 match response {
647 Rows(_) => &self.rows,
648 Stashed(_) => &self.rows_stashed,
649 Error(_) => &self.error,
650 Canceled => &self.canceled,
651 }
652 }
653}