mz_compute_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the compute controller components
11
12use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24    IntCounterVec, MetricVecExt, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39/// Compute controller metrics.
40#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42    // compute protocol
43    commands_total: IntCounterVec,
44    command_message_bytes_total: IntCounterVec,
45    responses_total: IntCounterVec,
46    response_message_bytes_total: IntCounterVec,
47
48    // controller state
49    replica_count: UIntGaugeVec,
50    collection_count: UIntGaugeVec,
51    collection_unscheduled_count: UIntGaugeVec,
52    peek_count: UIntGaugeVec,
53    subscribe_count: UIntGaugeVec,
54    copy_to_count: UIntGaugeVec,
55    command_queue_size: UIntGaugeVec,
56    response_send_count: IntCounterVec,
57    response_recv_count: IntCounterVec,
58    hydration_queue_size: UIntGaugeVec,
59
60    // command history
61    history_command_count: UIntGaugeVec,
62    history_dataflow_count: UIntGaugeVec,
63
64    // peeks
65    peeks_total: IntCounterVec,
66    peek_duration_seconds: HistogramVec,
67
68    // replica connections
69    connected_replica_count: UIntGaugeVec,
70    replica_connects_total: IntCounterVec,
71    replica_connect_wait_time_seconds_total: CounterVec,
72
73    /// Metrics shared with the storage controller.
74    shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78    /// Create a metrics instance registered into the given registry.
79    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80        ComputeControllerMetrics {
81            commands_total: metrics_registry.register(metric!(
82                name: "mz_compute_commands_total",
83                help: "The total number of compute commands sent.",
84                var_labels: ["instance_id", "replica_id", "command_type"],
85            )),
86            command_message_bytes_total: metrics_registry.register(metric!(
87                name: "mz_compute_command_message_bytes_total",
88                help: "The total number of bytes sent in compute command messages.",
89                var_labels: ["instance_id", "replica_id"],
90            )),
91            responses_total: metrics_registry.register(metric!(
92                name: "mz_compute_responses_total",
93                help: "The total number of compute responses sent.",
94                var_labels: ["instance_id", "replica_id", "response_type"],
95            )),
96            response_message_bytes_total: metrics_registry.register(metric!(
97                name: "mz_compute_response_message_bytes_total",
98                help: "The total number of bytes sent in compute response messages.",
99                var_labels: ["instance_id", "replica_id"],
100            )),
101            replica_count: metrics_registry.register(metric!(
102                name: "mz_compute_controller_replica_count",
103                help: "The number of replicas.",
104                var_labels: ["instance_id"],
105            )),
106            collection_count: metrics_registry.register(metric!(
107                name: "mz_compute_controller_collection_count",
108                help: "The number of installed compute collections.",
109                var_labels: ["instance_id"],
110            )),
111            collection_unscheduled_count: metrics_registry.register(metric!(
112                name: "mz_compute_controller_collection_unscheduled_count",
113                help: "The number of installed but unscheduled compute collections.",
114                var_labels: ["instance_id"],
115            )),
116            peek_count: metrics_registry.register(metric!(
117                name: "mz_compute_controller_peek_count",
118                help: "The number of pending peeks.",
119                var_labels: ["instance_id"],
120            )),
121            subscribe_count: metrics_registry.register(metric!(
122                name: "mz_compute_controller_subscribe_count",
123                help: "The number of active subscribes.",
124                var_labels: ["instance_id"],
125            )),
126            copy_to_count: metrics_registry.register(metric!(
127                name: "mz_compute_controller_copy_to_count",
128                help: "The number of active copy tos.",
129                var_labels: ["instance_id"],
130            )),
131            command_queue_size: metrics_registry.register(metric!(
132                name: "mz_compute_controller_command_queue_size",
133                help: "The size of the compute command queue.",
134                var_labels: ["instance_id", "replica_id"],
135            )),
136            response_send_count: metrics_registry.register(metric!(
137                name: "mz_compute_controller_response_send_count",
138                help: "The number of sends on the compute response queue.",
139                var_labels: ["instance_id"],
140            )),
141            response_recv_count: metrics_registry.register(metric!(
142                name: "mz_compute_controller_response_recv_count",
143                help: "The number of receives on the compute response queue.",
144                var_labels: ["instance_id"],
145            )),
146            hydration_queue_size: metrics_registry.register(metric!(
147                name: "mz_compute_controller_hydration_queue_size",
148                help: "The size of the compute hydration queue.",
149                var_labels: ["instance_id", "replica_id"],
150            )),
151            history_command_count: metrics_registry.register(metric!(
152                name: "mz_compute_controller_history_command_count",
153                help: "The number of commands in the controller's command history.",
154                var_labels: ["instance_id", "command_type"],
155            )),
156            history_dataflow_count: metrics_registry.register(metric!(
157                name: "mz_compute_controller_history_dataflow_count",
158                help: "The number of dataflows in the controller's command history.",
159                var_labels: ["instance_id"],
160            )),
161            peeks_total: metrics_registry.register(metric!(
162                name: "mz_compute_peeks_total",
163                help: "The total number of peeks served.",
164                var_labels: ["instance_id", "result"],
165            )),
166            peek_duration_seconds: metrics_registry.register(metric!(
167                name: "mz_compute_peek_duration_seconds",
168                help: "A histogram of peek durations since restart.",
169                var_labels: ["instance_id", "result"],
170                buckets: histogram_seconds_buckets(0.000_500, 32.),
171            )),
172            connected_replica_count: metrics_registry.register(metric!(
173                name: "mz_compute_controller_connected_replica_count",
174                help: "The number of replicas successfully connected to the compute controller.",
175                var_labels: ["instance_id"],
176            )),
177            replica_connects_total: metrics_registry.register(metric!(
178                name: "mz_compute_controller_replica_connects_total",
179                help: "The total number of replica (re-)connections made by the compute controller.",
180                var_labels: ["instance_id", "replica_id"],
181            )),
182            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
183                name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
184                help: "The total time the compute controller spent waiting for replica (re-)connection.",
185                var_labels: ["instance_id", "replica_id"],
186            )),
187
188            shared,
189        }
190    }
191
192    /// Return an object suitable for tracking metrics for the given compute instance.
193    pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
194        let labels = vec![instance_id.to_string()];
195        let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
196        let collection_count = self
197            .collection_count
198            .get_delete_on_drop_metric(labels.clone());
199        let collection_unscheduled_count = self
200            .collection_unscheduled_count
201            .get_delete_on_drop_metric(labels.clone());
202        let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
203        let subscribe_count = self
204            .subscribe_count
205            .get_delete_on_drop_metric(labels.clone());
206        let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
207        let history_command_count = CommandMetrics::build(|typ| {
208            let labels = labels.iter().cloned().chain([typ.into()]).collect();
209            self.history_command_count.get_delete_on_drop_metric(labels)
210        });
211        let history_dataflow_count = self
212            .history_dataflow_count
213            .get_delete_on_drop_metric(labels.clone());
214        let peeks_total = PeekMetrics::build(|typ| {
215            let labels = labels.iter().cloned().chain([typ.into()]).collect();
216            self.peeks_total.get_delete_on_drop_metric(labels)
217        });
218        let peek_duration_seconds = PeekMetrics::build(|typ| {
219            let labels = labels.iter().cloned().chain([typ.into()]).collect();
220            self.peek_duration_seconds.get_delete_on_drop_metric(labels)
221        });
222        let response_send_count = self
223            .response_send_count
224            .get_delete_on_drop_metric(labels.clone());
225        let response_recv_count = self
226            .response_recv_count
227            .get_delete_on_drop_metric(labels.clone());
228        let connected_replica_count = self
229            .connected_replica_count
230            .get_delete_on_drop_metric(labels);
231
232        InstanceMetrics {
233            instance_id,
234            metrics: self.clone(),
235            replica_count,
236            collection_count,
237            collection_unscheduled_count,
238            copy_to_count,
239            peek_count,
240            subscribe_count,
241            history_command_count,
242            history_dataflow_count,
243            peeks_total,
244            peek_duration_seconds,
245            response_send_count,
246            response_recv_count,
247            connected_replica_count,
248        }
249    }
250}
251
252/// Per-instance metrics
253#[derive(Debug)]
254pub struct InstanceMetrics {
255    instance_id: ComputeInstanceId,
256    metrics: ComputeControllerMetrics,
257
258    /// Gauge tracking the number of replicas.
259    pub replica_count: UIntGauge,
260    /// Gauge tracking the number of installed compute collections.
261    pub collection_count: UIntGauge,
262    /// Gauge tracking the number of installed but unscheduled compute collections.
263    pub collection_unscheduled_count: UIntGauge,
264    /// Gauge tracking the number of pending peeks.
265    pub peek_count: UIntGauge,
266    /// Gauge tracking the number of active subscribes.
267    pub subscribe_count: UIntGauge,
268    /// Gauge tracking the number of active COPY TO queries.
269    pub copy_to_count: UIntGauge,
270    /// Gauges tracking the number of commands in the command history.
271    pub history_command_count: CommandMetrics<UIntGauge>,
272    /// Gauge tracking the number of dataflows in the command history.
273    pub history_dataflow_count: UIntGauge,
274    /// Counter tracking the total number of peeks served.
275    pub peeks_total: PeekMetrics<IntCounter>,
276    /// Histogram tracking peek durations.
277    pub peek_duration_seconds: PeekMetrics<Histogram>,
278    /// Counter tracking the number of sends on the compute response queue.
279    pub response_send_count: IntCounter,
280    /// Counter tracking the number of receives on the compute response queue.
281    pub response_recv_count: IntCounter,
282    /// Gauge tracking the number of connected replicas.
283    pub connected_replica_count: UIntGauge,
284}
285
286impl InstanceMetrics {
287    /// TODO(database-issues#7533): Add documentation.
288    pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
289        let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
290        let extended_labels = |extra: &str| {
291            labels
292                .iter()
293                .cloned()
294                .chain([extra.into()])
295                .collect::<Vec<_>>()
296        };
297
298        let commands_total = CommandMetrics::build(|typ| {
299            let labels = extended_labels(typ);
300            self.metrics
301                .commands_total
302                .get_delete_on_drop_metric(labels)
303        });
304        let responses_total = ResponseMetrics::build(|typ| {
305            let labels = extended_labels(typ);
306            self.metrics
307                .responses_total
308                .get_delete_on_drop_metric(labels)
309        });
310
311        let command_message_bytes_total = self
312            .metrics
313            .command_message_bytes_total
314            .get_delete_on_drop_metric(labels.clone());
315        let response_message_bytes_total = self
316            .metrics
317            .response_message_bytes_total
318            .get_delete_on_drop_metric(labels.clone());
319
320        let command_queue_size = self
321            .metrics
322            .command_queue_size
323            .get_delete_on_drop_metric(labels.clone());
324        let hydration_queue_size = self
325            .metrics
326            .hydration_queue_size
327            .get_delete_on_drop_metric(labels.clone());
328
329        let replica_connects_total = self
330            .metrics
331            .replica_connects_total
332            .get_delete_on_drop_metric(labels.clone());
333        let replica_connect_wait_time_seconds_total = self
334            .metrics
335            .replica_connect_wait_time_seconds_total
336            .get_delete_on_drop_metric(labels);
337
338        ReplicaMetrics {
339            instance_id: self.instance_id,
340            replica_id,
341            metrics: self.metrics.clone(),
342            inner: Arc::new(ReplicaMetricsInner {
343                commands_total,
344                command_message_bytes_total,
345                responses_total,
346                response_message_bytes_total,
347                command_queue_size,
348                hydration_queue_size,
349                replica_connects_total,
350                replica_connect_wait_time_seconds_total,
351            }),
352        }
353    }
354
355    /// TODO(database-issues#7533): Add documentation.
356    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
357        let labels = vec![self.instance_id.to_string()];
358        let command_counts = CommandMetrics::build(|typ| {
359            let labels = labels.iter().cloned().chain([typ.into()]).collect();
360            self.metrics
361                .history_command_count
362                .get_delete_on_drop_metric(labels)
363        });
364        let dataflow_count = self
365            .metrics
366            .history_dataflow_count
367            .get_delete_on_drop_metric(labels);
368
369        HistoryMetrics {
370            command_counts,
371            dataflow_count,
372        }
373    }
374
375    /// Reflect the given peek response in the metrics.
376    pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
377        self.peeks_total.for_peek_response(response).inc();
378        self.peek_duration_seconds
379            .for_peek_response(response)
380            .observe(duration.as_secs_f64());
381    }
382}
383
384/// Per-replica metrics.
385#[derive(Debug, Clone)]
386pub struct ReplicaMetrics {
387    instance_id: ComputeInstanceId,
388    replica_id: ReplicaId,
389    metrics: ComputeControllerMetrics,
390
391    /// Metrics counters, wrapped in an `Arc` to be shareable between threads.
392    pub inner: Arc<ReplicaMetricsInner>,
393}
394
395/// Per-replica metrics counters.
396#[derive(Debug)]
397pub struct ReplicaMetricsInner {
398    commands_total: CommandMetrics<IntCounter>,
399    command_message_bytes_total: IntCounter,
400    responses_total: ResponseMetrics<IntCounter>,
401    response_message_bytes_total: IntCounter,
402
403    /// Gauge tracking the size of the compute command queue.
404    pub command_queue_size: UIntGauge,
405    /// Gauge tracking the size of the hydration queue.
406    pub hydration_queue_size: UIntGauge,
407
408    /// Counter tracking the total number of (re-)connects.
409    replica_connects_total: IntCounter,
410    /// Counter tracking the total time spent waiting for (re-)connects.
411    replica_connect_wait_time_seconds_total: Counter,
412}
413
414impl ReplicaMetrics {
415    pub(crate) fn for_collection(
416        &self,
417        collection_id: GlobalId,
418    ) -> Option<ReplicaCollectionMetrics> {
419        // In an effort to reduce the cardinality of timeseries created, we collect metrics only
420        // for non-transient collections. This is roughly equivalent to "long-lived" collections,
421        // with the exception of subscribes which may or may not be long-lived. We might want to
422        // change this policy in the future to track subscribes as well.
423        if collection_id.is_transient() {
424            return None;
425        }
426
427        let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
428            collection_id.to_string(),
429            Some(self.instance_id.to_string()),
430            Some(self.replica_id.to_string()),
431        );
432
433        Some(ReplicaCollectionMetrics { wallclock_lag })
434    }
435
436    /// Observe a successful replica connection.
437    pub(crate) fn observe_connect(&self) {
438        self.inner.replica_connects_total.inc();
439    }
440
441    /// Observe time spent waiting for a replica connection.
442    pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
443        self.inner
444            .replica_connect_wait_time_seconds_total
445            .inc_by(wait_time.as_secs_f64());
446    }
447}
448
449impl<T> transport::Metrics<ComputeCommand<T>, ComputeResponse<T>> for ReplicaMetrics {
450    fn bytes_sent(&mut self, len: usize) {
451        self.inner
452            .command_message_bytes_total
453            .inc_by(u64::cast_from(len));
454    }
455
456    fn bytes_received(&mut self, len: usize) {
457        self.inner
458            .response_message_bytes_total
459            .inc_by(u64::cast_from(len));
460    }
461
462    fn message_sent(&mut self, msg: &ComputeCommand<T>) {
463        self.inner.commands_total.for_command(msg).inc();
464    }
465
466    fn message_received(&mut self, msg: &ComputeResponse<T>) {
467        self.inner.responses_total.for_response(msg).inc();
468    }
469}
470
471/// Per-replica-and-collection metrics.
472#[derive(Debug)]
473pub(crate) struct ReplicaCollectionMetrics {
474    /// Metrics tracking dataflow wallclock lag.
475    pub wallclock_lag: WallclockLagMetrics,
476}
477
478/// Metrics keyed by `ComputeCommand` type.
479#[derive(Clone, Debug)]
480pub struct CommandMetrics<M> {
481    /// Metrics for `Hello`.
482    pub hello: M,
483    /// Metrics for `CreateInstance`.
484    pub create_instance: M,
485    /// Metrics for `CreateDataflow`.
486    pub create_dataflow: M,
487    /// Metrics for `Schedule`.
488    pub schedule: M,
489    /// Metrics for `AllowCompaction`.
490    pub allow_compaction: M,
491    /// Metrics for `Peek`.
492    pub peek: M,
493    /// Metrics for `CancelPeek`.
494    pub cancel_peek: M,
495    /// Metrics for `InitializationComplete`.
496    pub initialization_complete: M,
497    /// Metrics for `UpdateConfiguration`.
498    pub update_configuration: M,
499    /// Metrics for `AllowWrites`.
500    pub allow_writes: M,
501}
502
503impl<M> CommandMetrics<M> {
504    /// TODO(database-issues#7533): Add documentation.
505    pub fn build<F>(build_metric: F) -> Self
506    where
507        F: Fn(&str) -> M,
508    {
509        Self {
510            hello: build_metric("hello"),
511            create_instance: build_metric("create_instance"),
512            create_dataflow: build_metric("create_dataflow"),
513            schedule: build_metric("schedule"),
514            allow_compaction: build_metric("allow_compaction"),
515            peek: build_metric("peek"),
516            cancel_peek: build_metric("cancel_peek"),
517            initialization_complete: build_metric("initialization_complete"),
518            update_configuration: build_metric("update_configuration"),
519            allow_writes: build_metric("allow_writes"),
520        }
521    }
522
523    fn for_all<F>(&self, f: F)
524    where
525        F: Fn(&M),
526    {
527        f(&self.hello);
528        f(&self.create_instance);
529        f(&self.initialization_complete);
530        f(&self.update_configuration);
531        f(&self.create_dataflow);
532        f(&self.schedule);
533        f(&self.allow_compaction);
534        f(&self.peek);
535        f(&self.cancel_peek);
536        f(&self.allow_writes);
537    }
538
539    /// TODO(database-issues#7533): Add documentation.
540    pub fn for_command<T>(&self, command: &ComputeCommand<T>) -> &M {
541        use ComputeCommand::*;
542
543        match command {
544            Hello { .. } => &self.hello,
545            CreateInstance(_) => &self.create_instance,
546            InitializationComplete => &self.initialization_complete,
547            UpdateConfiguration(_) => &self.update_configuration,
548            CreateDataflow(_) => &self.create_dataflow,
549            Schedule(_) => &self.schedule,
550            AllowCompaction { .. } => &self.allow_compaction,
551            Peek(_) => &self.peek,
552            CancelPeek { .. } => &self.cancel_peek,
553            AllowWrites { .. } => &self.allow_writes,
554        }
555    }
556}
557
558/// Metrics keyed by `ComputeResponse` type.
559#[derive(Debug)]
560struct ResponseMetrics<M> {
561    frontiers: M,
562    peek_response: M,
563    subscribe_response: M,
564    copy_to_response: M,
565    status: M,
566}
567
568impl<M> ResponseMetrics<M> {
569    fn build<F>(build_metric: F) -> Self
570    where
571        F: Fn(&str) -> M,
572    {
573        Self {
574            frontiers: build_metric("frontiers"),
575            peek_response: build_metric("peek_response"),
576            subscribe_response: build_metric("subscribe_response"),
577            copy_to_response: build_metric("copy_to_response"),
578            status: build_metric("status"),
579        }
580    }
581
582    fn for_response<T>(&self, response: &ComputeResponse<T>) -> &M {
583        use ComputeResponse::*;
584
585        match response {
586            Frontiers(..) => &self.frontiers,
587            PeekResponse(..) => &self.peek_response,
588            SubscribeResponse(..) => &self.subscribe_response,
589            CopyToResponse(..) => &self.copy_to_response,
590            Status(..) => &self.status,
591        }
592    }
593}
594
595/// Metrics tracked by the command history.
596#[derive(Debug)]
597pub struct HistoryMetrics<G> {
598    /// Metrics tracking command counts.
599    pub command_counts: CommandMetrics<G>,
600    /// Metric tracking the dataflow count.
601    pub dataflow_count: G,
602}
603
604impl<G> HistoryMetrics<G>
605where
606    G: Borrow<mz_ore::metrics::UIntGauge>,
607{
608    /// Reset all tracked counts to 0.
609    pub fn reset(&self) {
610        self.command_counts.for_all(|m| m.borrow().set(0));
611        self.dataflow_count.borrow().set(0);
612    }
613}
614
615/// Metrics for finished peeks, keyed by peek result.
616#[derive(Debug)]
617pub struct PeekMetrics<M> {
618    rows: M,
619    rows_stashed: M,
620    error: M,
621    canceled: M,
622}
623
624impl<M> PeekMetrics<M> {
625    fn build<F>(build_metric: F) -> Self
626    where
627        F: Fn(&str) -> M,
628    {
629        Self {
630            rows: build_metric("rows"),
631            rows_stashed: build_metric("rows_stashed"),
632            error: build_metric("error"),
633            canceled: build_metric("canceled"),
634        }
635    }
636
637    fn for_peek_response(&self, response: &PeekResponse) -> &M {
638        use PeekResponse::*;
639
640        match response {
641            Rows(_) => &self.rows,
642            Stashed(_) => &self.rows_stashed,
643            Error(_) => &self.error,
644            Canceled => &self.canceled,
645        }
646    }
647}