Skip to main content

mz_compute_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the compute controller components
11
12use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24    IntCounterVec, MetricVecExt, MetricVisibility, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39/// Compute controller metrics.
40#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42    // compute protocol
43    commands_total: IntCounterVec,
44    command_message_bytes_total: IntCounterVec,
45    responses_total: IntCounterVec,
46    response_message_bytes_total: IntCounterVec,
47
48    // controller state
49    replica_count: UIntGaugeVec,
50    collection_count: UIntGaugeVec,
51    collection_unscheduled_count: UIntGaugeVec,
52    peek_count: UIntGaugeVec,
53    subscribe_count: UIntGaugeVec,
54    copy_to_count: UIntGaugeVec,
55    command_queue_size: UIntGaugeVec,
56    response_send_count: IntCounterVec,
57    response_recv_count: IntCounterVec,
58    hydration_queue_size: UIntGaugeVec,
59
60    // command history
61    history_command_count: UIntGaugeVec,
62    history_dataflow_count: UIntGaugeVec,
63
64    // peeks
65    peeks_total: IntCounterVec,
66    peek_duration_seconds: HistogramVec,
67
68    // replica connections
69    connected_replica_count: UIntGaugeVec,
70    replica_connects_total: IntCounterVec,
71    replica_connect_wait_time_seconds_total: CounterVec,
72
73    /// Metrics shared with the storage controller.
74    shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78    /// Create a metrics instance registered into the given registry.
79    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80        ComputeControllerMetrics {
81            commands_total: metrics_registry.register(metric!(
82                name: "mz_compute_commands_total",
83                help: "The total number of compute commands sent.",
84                var_labels: ["instance_id", "replica_id", "command_type"],
85                visibility: MetricVisibility::Public,
86            )),
87            command_message_bytes_total: metrics_registry.register(metric!(
88                name: "mz_compute_command_message_bytes_total",
89                help: "The total number of bytes sent in compute command messages.",
90                var_labels: ["instance_id", "replica_id"],
91            )),
92            responses_total: metrics_registry.register(metric!(
93                name: "mz_compute_responses_total",
94                help: "The total number of compute responses sent.",
95                var_labels: ["instance_id", "replica_id", "response_type"],
96            )),
97            response_message_bytes_total: metrics_registry.register(metric!(
98                name: "mz_compute_response_message_bytes_total",
99                help: "The total number of bytes sent in compute response messages.",
100                var_labels: ["instance_id", "replica_id"],
101            )),
102            replica_count: metrics_registry.register(metric!(
103                name: "mz_compute_controller_replica_count",
104                help: "The number of replicas.",
105                var_labels: ["instance_id"],
106            )),
107            collection_count: metrics_registry.register(metric!(
108                name: "mz_compute_controller_collection_count",
109                help: "The number of installed compute collections.",
110                var_labels: ["instance_id"],
111            )),
112            collection_unscheduled_count: metrics_registry.register(metric!(
113                name: "mz_compute_controller_collection_unscheduled_count",
114                help: "The number of installed but unscheduled compute collections.",
115                var_labels: ["instance_id"],
116            )),
117            peek_count: metrics_registry.register(metric!(
118                name: "mz_compute_controller_peek_count",
119                help: "The number of pending peeks.",
120                var_labels: ["instance_id"],
121            )),
122            subscribe_count: metrics_registry.register(metric!(
123                name: "mz_compute_controller_subscribe_count",
124                help: "The number of active subscribes.",
125                var_labels: ["instance_id"],
126            )),
127            copy_to_count: metrics_registry.register(metric!(
128                name: "mz_compute_controller_copy_to_count",
129                help: "The number of active copy tos.",
130                var_labels: ["instance_id"],
131            )),
132            command_queue_size: metrics_registry.register(metric!(
133                name: "mz_compute_controller_command_queue_size",
134                help: "The size of the compute command queue.",
135                var_labels: ["instance_id", "replica_id"],
136            )),
137            response_send_count: metrics_registry.register(metric!(
138                name: "mz_compute_controller_response_send_count",
139                help: "The number of sends on the compute response queue.",
140                var_labels: ["instance_id"],
141            )),
142            response_recv_count: metrics_registry.register(metric!(
143                name: "mz_compute_controller_response_recv_count",
144                help: "The number of receives on the compute response queue.",
145                var_labels: ["instance_id"],
146            )),
147            hydration_queue_size: metrics_registry.register(metric!(
148                name: "mz_compute_controller_hydration_queue_size",
149                help: "The size of the compute hydration queue.",
150                var_labels: ["instance_id", "replica_id"],
151                visibility: MetricVisibility::Public,
152            )),
153            history_command_count: metrics_registry.register(metric!(
154                name: "mz_compute_controller_history_command_count",
155                help: "The number of commands in the controller's command history.",
156                var_labels: ["instance_id", "command_type"],
157            )),
158            history_dataflow_count: metrics_registry.register(metric!(
159                name: "mz_compute_controller_history_dataflow_count",
160                help: "The number of dataflows in the controller's command history.",
161                var_labels: ["instance_id"],
162            )),
163            peeks_total: metrics_registry.register(metric!(
164                name: "mz_compute_peeks_total",
165                help: "The total number of peeks served.",
166                var_labels: ["instance_id", "result"],
167            )),
168            peek_duration_seconds: metrics_registry.register(metric!(
169                name: "mz_compute_peek_duration_seconds",
170                help: "A histogram of peek durations since restart.",
171                var_labels: ["instance_id", "result"],
172                buckets: histogram_seconds_buckets(0.000_500, 32.),
173                visibility: MetricVisibility::Public,
174            )),
175            connected_replica_count: metrics_registry.register(metric!(
176                name: "mz_compute_controller_connected_replica_count",
177                help: "The number of replicas successfully connected to the compute controller.",
178                var_labels: ["instance_id"],
179            )),
180            replica_connects_total: metrics_registry.register(metric!(
181                name: "mz_compute_controller_replica_connects_total",
182                help: "The total number of replica (re-)connections made by the compute controller.",
183                var_labels: ["instance_id", "replica_id"],
184            )),
185            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
186                name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
187                help: "The total time the compute controller spent waiting for replica (re-)connection.",
188                var_labels: ["instance_id", "replica_id"],
189            )),
190
191            shared,
192        }
193    }
194
195    /// Return an object suitable for tracking metrics for the given compute instance.
196    pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
197        let labels = vec![instance_id.to_string()];
198        let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
199        let collection_count = self
200            .collection_count
201            .get_delete_on_drop_metric(labels.clone());
202        let collection_unscheduled_count = self
203            .collection_unscheduled_count
204            .get_delete_on_drop_metric(labels.clone());
205        let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
206        let subscribe_count = self
207            .subscribe_count
208            .get_delete_on_drop_metric(labels.clone());
209        let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
210        let history_command_count = CommandMetrics::build(|typ| {
211            let labels = labels.iter().cloned().chain([typ.into()]).collect();
212            self.history_command_count.get_delete_on_drop_metric(labels)
213        });
214        let history_dataflow_count = self
215            .history_dataflow_count
216            .get_delete_on_drop_metric(labels.clone());
217        let peeks_total = PeekMetrics::build(|typ| {
218            let labels = labels.iter().cloned().chain([typ.into()]).collect();
219            self.peeks_total.get_delete_on_drop_metric(labels)
220        });
221        let peek_duration_seconds = PeekMetrics::build(|typ| {
222            let labels = labels.iter().cloned().chain([typ.into()]).collect();
223            self.peek_duration_seconds.get_delete_on_drop_metric(labels)
224        });
225        let response_send_count = self
226            .response_send_count
227            .get_delete_on_drop_metric(labels.clone());
228        let response_recv_count = self
229            .response_recv_count
230            .get_delete_on_drop_metric(labels.clone());
231        let connected_replica_count = self
232            .connected_replica_count
233            .get_delete_on_drop_metric(labels);
234
235        InstanceMetrics {
236            instance_id,
237            metrics: self.clone(),
238            replica_count,
239            collection_count,
240            collection_unscheduled_count,
241            copy_to_count,
242            peek_count,
243            subscribe_count,
244            history_command_count,
245            history_dataflow_count,
246            peeks_total,
247            peek_duration_seconds,
248            response_send_count,
249            response_recv_count,
250            connected_replica_count,
251        }
252    }
253}
254
255/// Per-instance metrics
256#[derive(Debug)]
257pub struct InstanceMetrics {
258    instance_id: ComputeInstanceId,
259    metrics: ComputeControllerMetrics,
260
261    /// Gauge tracking the number of replicas.
262    pub replica_count: UIntGauge,
263    /// Gauge tracking the number of installed compute collections.
264    pub collection_count: UIntGauge,
265    /// Gauge tracking the number of installed but unscheduled compute collections.
266    pub collection_unscheduled_count: UIntGauge,
267    /// Gauge tracking the number of pending peeks.
268    pub peek_count: UIntGauge,
269    /// Gauge tracking the number of active subscribes.
270    pub subscribe_count: UIntGauge,
271    /// Gauge tracking the number of active COPY TO queries.
272    pub copy_to_count: UIntGauge,
273    /// Gauges tracking the number of commands in the command history.
274    pub history_command_count: CommandMetrics<UIntGauge>,
275    /// Gauge tracking the number of dataflows in the command history.
276    pub history_dataflow_count: UIntGauge,
277    /// Counter tracking the total number of peeks served.
278    pub peeks_total: PeekMetrics<IntCounter>,
279    /// Histogram tracking peek durations.
280    pub peek_duration_seconds: PeekMetrics<Histogram>,
281    /// Counter tracking the number of sends on the compute response queue.
282    pub response_send_count: IntCounter,
283    /// Counter tracking the number of receives on the compute response queue.
284    pub response_recv_count: IntCounter,
285    /// Gauge tracking the number of connected replicas.
286    pub connected_replica_count: UIntGauge,
287}
288
289impl InstanceMetrics {
290    /// TODO(database-issues#7533): Add documentation.
291    pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
292        let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
293        let extended_labels = |extra: &str| {
294            labels
295                .iter()
296                .cloned()
297                .chain([extra.into()])
298                .collect::<Vec<_>>()
299        };
300
301        let commands_total = CommandMetrics::build(|typ| {
302            let labels = extended_labels(typ);
303            self.metrics
304                .commands_total
305                .get_delete_on_drop_metric(labels)
306        });
307        let responses_total = ResponseMetrics::build(|typ| {
308            let labels = extended_labels(typ);
309            self.metrics
310                .responses_total
311                .get_delete_on_drop_metric(labels)
312        });
313
314        let command_message_bytes_total = self
315            .metrics
316            .command_message_bytes_total
317            .get_delete_on_drop_metric(labels.clone());
318        let response_message_bytes_total = self
319            .metrics
320            .response_message_bytes_total
321            .get_delete_on_drop_metric(labels.clone());
322
323        let command_queue_size = self
324            .metrics
325            .command_queue_size
326            .get_delete_on_drop_metric(labels.clone());
327        let hydration_queue_size = self
328            .metrics
329            .hydration_queue_size
330            .get_delete_on_drop_metric(labels.clone());
331
332        let replica_connects_total = self
333            .metrics
334            .replica_connects_total
335            .get_delete_on_drop_metric(labels.clone());
336        let replica_connect_wait_time_seconds_total = self
337            .metrics
338            .replica_connect_wait_time_seconds_total
339            .get_delete_on_drop_metric(labels);
340
341        ReplicaMetrics {
342            instance_id: self.instance_id,
343            replica_id,
344            metrics: self.metrics.clone(),
345            inner: Arc::new(ReplicaMetricsInner {
346                commands_total,
347                command_message_bytes_total,
348                responses_total,
349                response_message_bytes_total,
350                command_queue_size,
351                hydration_queue_size,
352                replica_connects_total,
353                replica_connect_wait_time_seconds_total,
354            }),
355        }
356    }
357
358    /// TODO(database-issues#7533): Add documentation.
359    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
360        let labels = vec![self.instance_id.to_string()];
361        let command_counts = CommandMetrics::build(|typ| {
362            let labels = labels.iter().cloned().chain([typ.into()]).collect();
363            self.metrics
364                .history_command_count
365                .get_delete_on_drop_metric(labels)
366        });
367        let dataflow_count = self
368            .metrics
369            .history_dataflow_count
370            .get_delete_on_drop_metric(labels);
371
372        HistoryMetrics {
373            command_counts,
374            dataflow_count,
375        }
376    }
377
378    /// Reflect the given peek response in the metrics.
379    pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
380        self.peeks_total.for_peek_response(response).inc();
381        self.peek_duration_seconds
382            .for_peek_response(response)
383            .observe(duration.as_secs_f64());
384    }
385}
386
387/// Per-replica metrics.
388#[derive(Debug, Clone)]
389pub struct ReplicaMetrics {
390    instance_id: ComputeInstanceId,
391    replica_id: ReplicaId,
392    metrics: ComputeControllerMetrics,
393
394    /// Metrics counters, wrapped in an `Arc` to be shareable between threads.
395    pub inner: Arc<ReplicaMetricsInner>,
396}
397
398/// Per-replica metrics counters.
399#[derive(Debug)]
400pub struct ReplicaMetricsInner {
401    commands_total: CommandMetrics<IntCounter>,
402    command_message_bytes_total: IntCounter,
403    responses_total: ResponseMetrics<IntCounter>,
404    response_message_bytes_total: IntCounter,
405
406    /// Gauge tracking the size of the compute command queue.
407    pub command_queue_size: UIntGauge,
408    /// Gauge tracking the size of the hydration queue.
409    pub hydration_queue_size: UIntGauge,
410
411    /// Counter tracking the total number of (re-)connects.
412    replica_connects_total: IntCounter,
413    /// Counter tracking the total time spent waiting for (re-)connects.
414    replica_connect_wait_time_seconds_total: Counter,
415}
416
417impl ReplicaMetrics {
418    pub(crate) fn for_collection(
419        &self,
420        collection_id: GlobalId,
421    ) -> Option<ReplicaCollectionMetrics> {
422        // In an effort to reduce the cardinality of timeseries created, we collect metrics only
423        // for non-transient collections. This is roughly equivalent to "long-lived" collections,
424        // with the exception of subscribes which may or may not be long-lived. We might want to
425        // change this policy in the future to track subscribes as well.
426        if collection_id.is_transient() {
427            return None;
428        }
429
430        let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
431            collection_id.to_string(),
432            Some(self.instance_id.to_string()),
433            Some(self.replica_id.to_string()),
434        );
435
436        Some(ReplicaCollectionMetrics { wallclock_lag })
437    }
438
439    /// Observe a successful replica connection.
440    pub(crate) fn observe_connect(&self) {
441        self.inner.replica_connects_total.inc();
442    }
443
444    /// Observe time spent waiting for a replica connection.
445    pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
446        self.inner
447            .replica_connect_wait_time_seconds_total
448            .inc_by(wait_time.as_secs_f64());
449    }
450}
451
452impl transport::Metrics<ComputeCommand, ComputeResponse> for ReplicaMetrics {
453    fn bytes_sent(&mut self, len: usize) {
454        self.inner
455            .command_message_bytes_total
456            .inc_by(u64::cast_from(len));
457    }
458
459    fn bytes_received(&mut self, len: usize) {
460        self.inner
461            .response_message_bytes_total
462            .inc_by(u64::cast_from(len));
463    }
464
465    fn message_sent(&mut self, msg: &ComputeCommand) {
466        self.inner.commands_total.for_command(msg).inc();
467    }
468
469    fn message_received(&mut self, msg: &ComputeResponse) {
470        self.inner.responses_total.for_response(msg).inc();
471    }
472}
473
474/// Per-replica-and-collection metrics.
475#[derive(Debug)]
476pub(crate) struct ReplicaCollectionMetrics {
477    /// Metrics tracking dataflow wallclock lag.
478    pub wallclock_lag: WallclockLagMetrics,
479}
480
481/// Metrics keyed by `ComputeCommand` type.
482#[derive(Clone, Debug)]
483pub struct CommandMetrics<M> {
484    /// Metrics for `Hello`.
485    pub hello: M,
486    /// Metrics for `CreateInstance`.
487    pub create_instance: M,
488    /// Metrics for `CreateDataflow`.
489    pub create_dataflow: M,
490    /// Metrics for `Schedule`.
491    pub schedule: M,
492    /// Metrics for `AllowCompaction`.
493    pub allow_compaction: M,
494    /// Metrics for `Peek`.
495    pub peek: M,
496    /// Metrics for `CancelPeek`.
497    pub cancel_peek: M,
498    /// Metrics for `InitializationComplete`.
499    pub initialization_complete: M,
500    /// Metrics for `UpdateConfiguration`.
501    pub update_configuration: M,
502    /// Metrics for `AllowWrites`.
503    pub allow_writes: M,
504}
505
506impl<M> CommandMetrics<M> {
507    /// TODO(database-issues#7533): Add documentation.
508    pub fn build<F>(build_metric: F) -> Self
509    where
510        F: Fn(&str) -> M,
511    {
512        Self {
513            hello: build_metric("hello"),
514            create_instance: build_metric("create_instance"),
515            create_dataflow: build_metric("create_dataflow"),
516            schedule: build_metric("schedule"),
517            allow_compaction: build_metric("allow_compaction"),
518            peek: build_metric("peek"),
519            cancel_peek: build_metric("cancel_peek"),
520            initialization_complete: build_metric("initialization_complete"),
521            update_configuration: build_metric("update_configuration"),
522            allow_writes: build_metric("allow_writes"),
523        }
524    }
525
526    fn for_all<F>(&self, f: F)
527    where
528        F: Fn(&M),
529    {
530        f(&self.hello);
531        f(&self.create_instance);
532        f(&self.initialization_complete);
533        f(&self.update_configuration);
534        f(&self.create_dataflow);
535        f(&self.schedule);
536        f(&self.allow_compaction);
537        f(&self.peek);
538        f(&self.cancel_peek);
539        f(&self.allow_writes);
540    }
541
542    /// TODO(database-issues#7533): Add documentation.
543    pub fn for_command(&self, command: &ComputeCommand) -> &M {
544        use ComputeCommand::*;
545
546        match command {
547            Hello { .. } => &self.hello,
548            CreateInstance(_) => &self.create_instance,
549            InitializationComplete => &self.initialization_complete,
550            UpdateConfiguration(_) => &self.update_configuration,
551            CreateDataflow(_) => &self.create_dataflow,
552            Schedule(_) => &self.schedule,
553            AllowCompaction { .. } => &self.allow_compaction,
554            Peek(_) => &self.peek,
555            CancelPeek { .. } => &self.cancel_peek,
556            AllowWrites { .. } => &self.allow_writes,
557        }
558    }
559}
560
561/// Metrics keyed by `ComputeResponse` type.
562#[derive(Debug)]
563struct ResponseMetrics<M> {
564    frontiers: M,
565    peek_response: M,
566    subscribe_response: M,
567    copy_to_response: M,
568    status: M,
569}
570
571impl<M> ResponseMetrics<M> {
572    fn build<F>(build_metric: F) -> Self
573    where
574        F: Fn(&str) -> M,
575    {
576        Self {
577            frontiers: build_metric("frontiers"),
578            peek_response: build_metric("peek_response"),
579            subscribe_response: build_metric("subscribe_response"),
580            copy_to_response: build_metric("copy_to_response"),
581            status: build_metric("status"),
582        }
583    }
584
585    fn for_response(&self, response: &ComputeResponse) -> &M {
586        use ComputeResponse::*;
587
588        match response {
589            Frontiers(..) => &self.frontiers,
590            PeekResponse(..) => &self.peek_response,
591            SubscribeResponse(..) => &self.subscribe_response,
592            CopyToResponse(..) => &self.copy_to_response,
593            Status(..) => &self.status,
594        }
595    }
596}
597
598/// Metrics tracked by the command history.
599#[derive(Debug)]
600pub struct HistoryMetrics<G> {
601    /// Metrics tracking command counts.
602    pub command_counts: CommandMetrics<G>,
603    /// Metric tracking the dataflow count.
604    pub dataflow_count: G,
605}
606
607impl<G> HistoryMetrics<G>
608where
609    G: Borrow<mz_ore::metrics::UIntGauge>,
610{
611    /// Reset all tracked counts to 0.
612    pub fn reset(&self) {
613        self.command_counts.for_all(|m| m.borrow().set(0));
614        self.dataflow_count.borrow().set(0);
615    }
616}
617
618/// Metrics for finished peeks, keyed by peek result.
619#[derive(Debug)]
620pub struct PeekMetrics<M> {
621    rows: M,
622    rows_stashed: M,
623    error: M,
624    canceled: M,
625}
626
627impl<M> PeekMetrics<M> {
628    fn build<F>(build_metric: F) -> Self
629    where
630        F: Fn(&str) -> M,
631    {
632        Self {
633            rows: build_metric("rows"),
634            rows_stashed: build_metric("rows_stashed"),
635            error: build_metric("error"),
636            canceled: build_metric("canceled"),
637        }
638    }
639
640    fn for_peek_response(&self, response: &PeekResponse) -> &M {
641        use PeekResponse::*;
642
643        match response {
644            Rows(_) => &self.rows,
645            Stashed(_) => &self.rows_stashed,
646            Error(_) => &self.error,
647            Canceled => &self.canceled,
648        }
649    }
650}