1use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24 IntCounterVec, MetricVecExt, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::codec::StatsCollector;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::{ComputeCommand, ProtoComputeCommand};
32use crate::protocol::response::{PeekResponse, ProtoComputeResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42 commands_total: IntCounterVec,
44 command_message_bytes_total: IntCounterVec,
45 responses_total: IntCounterVec,
46 response_message_bytes_total: IntCounterVec,
47
48 replica_count: UIntGaugeVec,
50 collection_count: UIntGaugeVec,
51 collection_unscheduled_count: UIntGaugeVec,
52 peek_count: UIntGaugeVec,
53 subscribe_count: UIntGaugeVec,
54 copy_to_count: UIntGaugeVec,
55 command_queue_size: UIntGaugeVec,
56 response_send_count: IntCounterVec,
57 response_recv_count: IntCounterVec,
58 hydration_queue_size: UIntGaugeVec,
59
60 history_command_count: UIntGaugeVec,
62 history_dataflow_count: UIntGaugeVec,
63
64 peeks_total: IntCounterVec,
66 peek_duration_seconds: HistogramVec,
67
68 connected_replica_count: UIntGaugeVec,
70 replica_connects_total: IntCounterVec,
71 replica_connect_wait_time_seconds_total: CounterVec,
72
73 shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78 pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80 ComputeControllerMetrics {
81 commands_total: metrics_registry.register(metric!(
82 name: "mz_compute_commands_total",
83 help: "The total number of compute commands sent.",
84 var_labels: ["instance_id", "replica_id", "command_type"],
85 )),
86 command_message_bytes_total: metrics_registry.register(metric!(
87 name: "mz_compute_command_message_bytes_total",
88 help: "The total number of bytes sent in compute command messages.",
89 var_labels: ["instance_id", "replica_id", "command_type"],
90 )),
91 responses_total: metrics_registry.register(metric!(
92 name: "mz_compute_responses_total",
93 help: "The total number of compute responses sent.",
94 var_labels: ["instance_id", "replica_id", "response_type"],
95 )),
96 response_message_bytes_total: metrics_registry.register(metric!(
97 name: "mz_compute_response_message_bytes_total",
98 help: "The total number of bytes sent in compute response messages.",
99 var_labels: ["instance_id", "replica_id", "response_type"],
100 )),
101 replica_count: metrics_registry.register(metric!(
102 name: "mz_compute_controller_replica_count",
103 help: "The number of replicas.",
104 var_labels: ["instance_id"],
105 )),
106 collection_count: metrics_registry.register(metric!(
107 name: "mz_compute_controller_collection_count",
108 help: "The number of installed compute collections.",
109 var_labels: ["instance_id"],
110 )),
111 collection_unscheduled_count: metrics_registry.register(metric!(
112 name: "mz_compute_controller_collection_unscheduled_count",
113 help: "The number of installed but unscheduled compute collections.",
114 var_labels: ["instance_id"],
115 )),
116 peek_count: metrics_registry.register(metric!(
117 name: "mz_compute_controller_peek_count",
118 help: "The number of pending peeks.",
119 var_labels: ["instance_id"],
120 )),
121 subscribe_count: metrics_registry.register(metric!(
122 name: "mz_compute_controller_subscribe_count",
123 help: "The number of active subscribes.",
124 var_labels: ["instance_id"],
125 )),
126 copy_to_count: metrics_registry.register(metric!(
127 name: "mz_compute_controller_copy_to_count",
128 help: "The number of active copy tos.",
129 var_labels: ["instance_id"],
130 )),
131 command_queue_size: metrics_registry.register(metric!(
132 name: "mz_compute_controller_command_queue_size",
133 help: "The size of the compute command queue.",
134 var_labels: ["instance_id", "replica_id"],
135 )),
136 response_send_count: metrics_registry.register(metric!(
137 name: "mz_compute_controller_response_send_count",
138 help: "The number of sends on the compute response queue.",
139 var_labels: ["instance_id"],
140 )),
141 response_recv_count: metrics_registry.register(metric!(
142 name: "mz_compute_controller_response_recv_count",
143 help: "The number of receives on the compute response queue.",
144 var_labels: ["instance_id"],
145 )),
146 hydration_queue_size: metrics_registry.register(metric!(
147 name: "mz_compute_controller_hydration_queue_size",
148 help: "The size of the compute hydration queue.",
149 var_labels: ["instance_id", "replica_id"],
150 )),
151 history_command_count: metrics_registry.register(metric!(
152 name: "mz_compute_controller_history_command_count",
153 help: "The number of commands in the controller's command history.",
154 var_labels: ["instance_id", "command_type"],
155 )),
156 history_dataflow_count: metrics_registry.register(metric!(
157 name: "mz_compute_controller_history_dataflow_count",
158 help: "The number of dataflows in the controller's command history.",
159 var_labels: ["instance_id"],
160 )),
161 peeks_total: metrics_registry.register(metric!(
162 name: "mz_compute_peeks_total",
163 help: "The total number of peeks served.",
164 var_labels: ["instance_id", "result"],
165 )),
166 peek_duration_seconds: metrics_registry.register(metric!(
167 name: "mz_compute_peek_duration_seconds",
168 help: "A histogram of peek durations since restart.",
169 var_labels: ["instance_id", "result"],
170 buckets: histogram_seconds_buckets(0.000_500, 32.),
171 )),
172 connected_replica_count: metrics_registry.register(metric!(
173 name: "mz_compute_controller_connected_replica_count",
174 help: "The number of replicas successfully connected to the compute controller.",
175 var_labels: ["instance_id"],
176 )),
177 replica_connects_total: metrics_registry.register(metric!(
178 name: "mz_compute_controller_replica_connects_total",
179 help: "The total number of replica (re-)connections made by the compute controller.",
180 var_labels: ["instance_id", "replica_id"],
181 )),
182 replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
183 name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
184 help: "The total time the compute controller spent waiting for replica (re-)connection.",
185 var_labels: ["instance_id", "replica_id"],
186 )),
187
188 shared,
189 }
190 }
191
192 pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
194 let labels = vec![instance_id.to_string()];
195 let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
196 let collection_count = self
197 .collection_count
198 .get_delete_on_drop_metric(labels.clone());
199 let collection_unscheduled_count = self
200 .collection_unscheduled_count
201 .get_delete_on_drop_metric(labels.clone());
202 let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
203 let subscribe_count = self
204 .subscribe_count
205 .get_delete_on_drop_metric(labels.clone());
206 let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
207 let history_command_count = CommandMetrics::build(|typ| {
208 let labels = labels.iter().cloned().chain([typ.into()]).collect();
209 self.history_command_count.get_delete_on_drop_metric(labels)
210 });
211 let history_dataflow_count = self
212 .history_dataflow_count
213 .get_delete_on_drop_metric(labels.clone());
214 let peeks_total = PeekMetrics::build(|typ| {
215 let labels = labels.iter().cloned().chain([typ.into()]).collect();
216 self.peeks_total.get_delete_on_drop_metric(labels)
217 });
218 let peek_duration_seconds = PeekMetrics::build(|typ| {
219 let labels = labels.iter().cloned().chain([typ.into()]).collect();
220 self.peek_duration_seconds.get_delete_on_drop_metric(labels)
221 });
222 let response_send_count = self
223 .response_send_count
224 .get_delete_on_drop_metric(labels.clone());
225 let response_recv_count = self
226 .response_recv_count
227 .get_delete_on_drop_metric(labels.clone());
228 let connected_replica_count = self
229 .connected_replica_count
230 .get_delete_on_drop_metric(labels);
231
232 InstanceMetrics {
233 instance_id,
234 metrics: self.clone(),
235 replica_count,
236 collection_count,
237 collection_unscheduled_count,
238 copy_to_count,
239 peek_count,
240 subscribe_count,
241 history_command_count,
242 history_dataflow_count,
243 peeks_total,
244 peek_duration_seconds,
245 response_send_count,
246 response_recv_count,
247 connected_replica_count,
248 }
249 }
250}
251
252#[derive(Debug)]
254pub struct InstanceMetrics {
255 instance_id: ComputeInstanceId,
256 metrics: ComputeControllerMetrics,
257
258 pub replica_count: UIntGauge,
260 pub collection_count: UIntGauge,
262 pub collection_unscheduled_count: UIntGauge,
264 pub peek_count: UIntGauge,
266 pub subscribe_count: UIntGauge,
268 pub copy_to_count: UIntGauge,
270 pub history_command_count: CommandMetrics<UIntGauge>,
272 pub history_dataflow_count: UIntGauge,
274 pub peeks_total: PeekMetrics<IntCounter>,
276 pub peek_duration_seconds: PeekMetrics<Histogram>,
278 pub response_send_count: IntCounter,
280 pub response_recv_count: IntCounter,
282 pub connected_replica_count: UIntGauge,
284}
285
286impl InstanceMetrics {
287 pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
289 let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
290 let extended_labels = |extra: &str| {
291 labels
292 .iter()
293 .cloned()
294 .chain([extra.into()])
295 .collect::<Vec<_>>()
296 };
297
298 let commands_total = CommandMetrics::build(|typ| {
299 let labels = extended_labels(typ);
300 self.metrics
301 .commands_total
302 .get_delete_on_drop_metric(labels)
303 });
304 let command_message_bytes_total = CommandMetrics::build(|typ| {
305 let labels = extended_labels(typ);
306 self.metrics
307 .command_message_bytes_total
308 .get_delete_on_drop_metric(labels)
309 });
310 let responses_total = ResponseMetrics::build(|typ| {
311 let labels = extended_labels(typ);
312 self.metrics
313 .responses_total
314 .get_delete_on_drop_metric(labels)
315 });
316 let response_message_bytes_total = ResponseMetrics::build(|typ| {
317 let labels = extended_labels(typ);
318 self.metrics
319 .response_message_bytes_total
320 .get_delete_on_drop_metric(labels)
321 });
322
323 let command_queue_size = self
324 .metrics
325 .command_queue_size
326 .get_delete_on_drop_metric(labels.clone());
327 let hydration_queue_size = self
328 .metrics
329 .hydration_queue_size
330 .get_delete_on_drop_metric(labels.clone());
331
332 let replica_connects_total = self
333 .metrics
334 .replica_connects_total
335 .get_delete_on_drop_metric(labels.clone());
336 let replica_connect_wait_time_seconds_total = self
337 .metrics
338 .replica_connect_wait_time_seconds_total
339 .get_delete_on_drop_metric(labels);
340
341 ReplicaMetrics {
342 instance_id: self.instance_id,
343 replica_id,
344 metrics: self.metrics.clone(),
345 inner: Arc::new(ReplicaMetricsInner {
346 commands_total,
347 command_message_bytes_total,
348 responses_total,
349 response_message_bytes_total,
350 command_queue_size,
351 hydration_queue_size,
352 replica_connects_total,
353 replica_connect_wait_time_seconds_total,
354 }),
355 }
356 }
357
358 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
360 let labels = vec![self.instance_id.to_string()];
361 let command_counts = CommandMetrics::build(|typ| {
362 let labels = labels.iter().cloned().chain([typ.into()]).collect();
363 self.metrics
364 .history_command_count
365 .get_delete_on_drop_metric(labels)
366 });
367 let dataflow_count = self
368 .metrics
369 .history_dataflow_count
370 .get_delete_on_drop_metric(labels);
371
372 HistoryMetrics {
373 command_counts,
374 dataflow_count,
375 }
376 }
377
378 pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
380 self.peeks_total.for_peek_response(response).inc();
381 self.peek_duration_seconds
382 .for_peek_response(response)
383 .observe(duration.as_secs_f64());
384 }
385}
386
387#[derive(Debug, Clone)]
389pub struct ReplicaMetrics {
390 instance_id: ComputeInstanceId,
391 replica_id: ReplicaId,
392 metrics: ComputeControllerMetrics,
393
394 pub inner: Arc<ReplicaMetricsInner>,
396}
397
398#[derive(Debug)]
400pub struct ReplicaMetricsInner {
401 commands_total: CommandMetrics<IntCounter>,
402 command_message_bytes_total: CommandMetrics<IntCounter>,
403 responses_total: ResponseMetrics<IntCounter>,
404 response_message_bytes_total: ResponseMetrics<IntCounter>,
405
406 pub command_queue_size: UIntGauge,
408 pub hydration_queue_size: UIntGauge,
410
411 replica_connects_total: IntCounter,
413 replica_connect_wait_time_seconds_total: Counter,
415}
416
417impl ReplicaMetrics {
418 pub(crate) fn for_collection(
419 &self,
420 collection_id: GlobalId,
421 ) -> Option<ReplicaCollectionMetrics> {
422 if collection_id.is_transient() {
427 return None;
428 }
429
430 let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
431 collection_id.to_string(),
432 Some(self.instance_id.to_string()),
433 Some(self.replica_id.to_string()),
434 );
435
436 Some(ReplicaCollectionMetrics { wallclock_lag })
437 }
438
439 pub(crate) fn observe_connect(&self) {
441 self.inner.replica_connects_total.inc();
442 }
443
444 pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
446 self.inner
447 .replica_connect_wait_time_seconds_total
448 .inc_by(wait_time.as_secs_f64());
449 }
450}
451
452impl StatsCollector<ProtoComputeCommand, ProtoComputeResponse> for ReplicaMetrics {
454 fn send_event(&self, item: &ProtoComputeCommand, size: usize) {
455 self.inner.commands_total.for_proto_command(item).inc();
456 self.inner
457 .command_message_bytes_total
458 .for_proto_command(item)
459 .inc_by(u64::cast_from(size));
460 }
461
462 fn receive_event(&self, item: &ProtoComputeResponse, size: usize) {
463 self.inner.responses_total.for_proto_response(item).inc();
464 self.inner
465 .response_message_bytes_total
466 .for_proto_response(item)
467 .inc_by(u64::cast_from(size));
468 }
469}
470
471#[derive(Debug)]
473pub(crate) struct ReplicaCollectionMetrics {
474 pub wallclock_lag: WallclockLagMetrics,
476}
477
478#[derive(Clone, Debug)]
480pub struct CommandMetrics<M> {
481 pub create_timely: M,
483 pub create_instance: M,
485 pub create_dataflow: M,
487 pub schedule: M,
489 pub allow_compaction: M,
491 pub peek: M,
493 pub cancel_peek: M,
495 pub initialization_complete: M,
497 pub update_configuration: M,
499 pub allow_writes: M,
501}
502
503impl<M> CommandMetrics<M> {
504 pub fn build<F>(build_metric: F) -> Self
506 where
507 F: Fn(&str) -> M,
508 {
509 Self {
510 create_timely: build_metric("create_timely"),
511 create_instance: build_metric("create_instance"),
512 create_dataflow: build_metric("create_dataflow"),
513 schedule: build_metric("schedule"),
514 allow_compaction: build_metric("allow_compaction"),
515 peek: build_metric("peek"),
516 cancel_peek: build_metric("cancel_peek"),
517 initialization_complete: build_metric("initialization_complete"),
518 update_configuration: build_metric("update_configuration"),
519 allow_writes: build_metric("allow_writes"),
520 }
521 }
522
523 fn for_all<F>(&self, f: F)
524 where
525 F: Fn(&M),
526 {
527 f(&self.create_timely);
528 f(&self.create_instance);
529 f(&self.initialization_complete);
530 f(&self.update_configuration);
531 f(&self.create_dataflow);
532 f(&self.schedule);
533 f(&self.allow_compaction);
534 f(&self.peek);
535 f(&self.cancel_peek);
536 }
537
538 pub fn for_command<T>(&self, command: &ComputeCommand<T>) -> &M {
540 use ComputeCommand::*;
541
542 match command {
543 CreateTimely { .. } => &self.create_timely,
544 CreateInstance(_) => &self.create_instance,
545 InitializationComplete => &self.initialization_complete,
546 UpdateConfiguration(_) => &self.update_configuration,
547 CreateDataflow(_) => &self.create_dataflow,
548 Schedule(_) => &self.schedule,
549 AllowCompaction { .. } => &self.allow_compaction,
550 Peek(_) => &self.peek,
551 CancelPeek { .. } => &self.cancel_peek,
552 AllowWrites { .. } => &self.allow_writes,
553 }
554 }
555
556 fn for_proto_command(&self, proto: &ProtoComputeCommand) -> &M {
557 use crate::protocol::command::proto_compute_command::Kind::*;
558
559 match proto.kind.as_ref().unwrap() {
560 CreateTimely(_) => &self.create_timely,
561 CreateInstance(_) => &self.create_instance,
562 CreateDataflow(_) => &self.create_dataflow,
563 Schedule(_) => &self.schedule,
564 AllowCompaction(_) => &self.allow_compaction,
565 Peek(_) => &self.peek,
566 CancelPeek(_) => &self.cancel_peek,
567 InitializationComplete(_) => &self.initialization_complete,
568 UpdateConfiguration(_) => &self.update_configuration,
569 AllowWrites(_) => &self.allow_writes,
570 }
571 }
572}
573
574#[derive(Debug)]
576struct ResponseMetrics<M> {
577 frontiers: M,
578 peek_response: M,
579 subscribe_response: M,
580 copy_to_response: M,
581 status: M,
582}
583
584impl<M> ResponseMetrics<M> {
585 fn build<F>(build_metric: F) -> Self
586 where
587 F: Fn(&str) -> M,
588 {
589 Self {
590 frontiers: build_metric("frontiers"),
591 peek_response: build_metric("peek_response"),
592 subscribe_response: build_metric("subscribe_response"),
593 copy_to_response: build_metric("copy_to_response"),
594 status: build_metric("status"),
595 }
596 }
597
598 fn for_proto_response(&self, proto: &ProtoComputeResponse) -> &M {
599 use crate::protocol::response::proto_compute_response::Kind::*;
600
601 match proto.kind.as_ref().unwrap() {
602 Frontiers(_) => &self.frontiers,
603 PeekResponse(_) => &self.peek_response,
604 SubscribeResponse(_) => &self.subscribe_response,
605 CopyToResponse(_) => &self.copy_to_response,
606 Status(_) => &self.status,
607 }
608 }
609}
610
611#[derive(Debug)]
613pub struct HistoryMetrics<G> {
614 pub command_counts: CommandMetrics<G>,
616 pub dataflow_count: G,
618}
619
620impl<G> HistoryMetrics<G>
621where
622 G: Borrow<mz_ore::metrics::UIntGauge>,
623{
624 pub fn reset(&self) {
626 self.command_counts.for_all(|m| m.borrow().set(0));
627 self.dataflow_count.borrow().set(0);
628 }
629}
630
631#[derive(Debug)]
633pub struct PeekMetrics<M> {
634 rows: M,
635 error: M,
636 canceled: M,
637}
638
639impl<M> PeekMetrics<M> {
640 fn build<F>(build_metric: F) -> Self
641 where
642 F: Fn(&str) -> M,
643 {
644 Self {
645 rows: build_metric("rows"),
646 error: build_metric("error"),
647 canceled: build_metric("canceled"),
648 }
649 }
650
651 fn for_peek_response(&self, response: &PeekResponse) -> &M {
652 use PeekResponse::*;
653
654 match response {
655 Rows(_) => &self.rows,
656 Error(_) => &self.error,
657 Canceled => &self.canceled,
658 }
659 }
660}