Skip to main content

mz_compute_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the compute controller components
11
12use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24    IntCounterVec, MetricTag, MetricVecExt, MetricVisibility, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39/// Compute controller metrics.
40#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42    // compute protocol
43    commands_total: IntCounterVec,
44    command_message_bytes_total: IntCounterVec,
45    responses_total: IntCounterVec,
46    response_message_bytes_total: IntCounterVec,
47
48    // controller state
49    replica_count: UIntGaugeVec,
50    collection_count: UIntGaugeVec,
51    collection_unscheduled_count: UIntGaugeVec,
52    peek_count: UIntGaugeVec,
53    subscribe_count: UIntGaugeVec,
54    copy_to_count: UIntGaugeVec,
55    command_queue_size: UIntGaugeVec,
56    response_send_count: IntCounterVec,
57    response_recv_count: IntCounterVec,
58    hydration_queue_size: UIntGaugeVec,
59
60    // command history
61    history_command_count: UIntGaugeVec,
62    history_dataflow_count: UIntGaugeVec,
63
64    // peeks
65    peeks_total: IntCounterVec,
66    peek_duration_seconds: HistogramVec,
67
68    // replica connections
69    connected_replica_count: UIntGaugeVec,
70    replica_connects_total: IntCounterVec,
71    replica_connect_wait_time_seconds_total: CounterVec,
72
73    /// Metrics shared with the storage controller.
74    shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78    /// Create a metrics instance registered into the given registry.
79    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80        ComputeControllerMetrics {
81            commands_total: metrics_registry.register(metric!(
82                name: "mz_compute_commands_total",
83                help: "The total number of compute commands sent.",
84                var_labels: ["instance_id", "replica_id", "command_type"],
85                visibility: MetricVisibility::Public,
86                tags: [MetricTag::Compute],
87            )),
88            command_message_bytes_total: metrics_registry.register(metric!(
89                name: "mz_compute_command_message_bytes_total",
90                help: "The total number of bytes sent in compute command messages.",
91                var_labels: ["instance_id", "replica_id"],
92            )),
93            responses_total: metrics_registry.register(metric!(
94                name: "mz_compute_responses_total",
95                help: "The total number of compute responses sent.",
96                var_labels: ["instance_id", "replica_id", "response_type"],
97            )),
98            response_message_bytes_total: metrics_registry.register(metric!(
99                name: "mz_compute_response_message_bytes_total",
100                help: "The total number of bytes sent in compute response messages.",
101                var_labels: ["instance_id", "replica_id"],
102            )),
103            replica_count: metrics_registry.register(metric!(
104                name: "mz_compute_controller_replica_count",
105                help: "The number of replicas.",
106                var_labels: ["instance_id"],
107            )),
108            collection_count: metrics_registry.register(metric!(
109                name: "mz_compute_controller_collection_count",
110                help: "The number of installed compute collections.",
111                var_labels: ["instance_id"],
112            )),
113            collection_unscheduled_count: metrics_registry.register(metric!(
114                name: "mz_compute_controller_collection_unscheduled_count",
115                help: "The number of installed but unscheduled compute collections.",
116                var_labels: ["instance_id"],
117            )),
118            peek_count: metrics_registry.register(metric!(
119                name: "mz_compute_controller_peek_count",
120                help: "The number of pending peeks.",
121                var_labels: ["instance_id"],
122            )),
123            subscribe_count: metrics_registry.register(metric!(
124                name: "mz_compute_controller_subscribe_count",
125                help: "The number of active subscribes.",
126                var_labels: ["instance_id"],
127            )),
128            copy_to_count: metrics_registry.register(metric!(
129                name: "mz_compute_controller_copy_to_count",
130                help: "The number of active copy tos.",
131                var_labels: ["instance_id"],
132            )),
133            command_queue_size: metrics_registry.register(metric!(
134                name: "mz_compute_controller_command_queue_size",
135                help: "The size of the compute command queue.",
136                var_labels: ["instance_id", "replica_id"],
137            )),
138            response_send_count: metrics_registry.register(metric!(
139                name: "mz_compute_controller_response_send_count",
140                help: "The number of sends on the compute response queue.",
141                var_labels: ["instance_id"],
142            )),
143            response_recv_count: metrics_registry.register(metric!(
144                name: "mz_compute_controller_response_recv_count",
145                help: "The number of receives on the compute response queue.",
146                var_labels: ["instance_id"],
147            )),
148            hydration_queue_size: metrics_registry.register(metric!(
149                name: "mz_compute_controller_hydration_queue_size",
150                help: "The size of the compute hydration queue.",
151                var_labels: ["instance_id", "replica_id"],
152                visibility: MetricVisibility::Public,
153                tags: [MetricTag::Compute],
154            )),
155            history_command_count: metrics_registry.register(metric!(
156                name: "mz_compute_controller_history_command_count",
157                help: "The number of commands in the controller's command history.",
158                var_labels: ["instance_id", "command_type"],
159            )),
160            history_dataflow_count: metrics_registry.register(metric!(
161                name: "mz_compute_controller_history_dataflow_count",
162                help: "The number of dataflows in the controller's command history.",
163                var_labels: ["instance_id"],
164            )),
165            peeks_total: metrics_registry.register(metric!(
166                name: "mz_compute_peeks_total",
167                help: "The total number of peeks served.",
168                var_labels: ["instance_id", "result"],
169            )),
170            peek_duration_seconds: metrics_registry.register(metric!(
171                name: "mz_compute_peek_duration_seconds",
172                help: "A histogram of peek durations since restart.",
173                var_labels: ["instance_id", "result"],
174                buckets: histogram_seconds_buckets(0.000_500, 32.),
175                visibility: MetricVisibility::Public,
176                tags: [MetricTag::Compute],
177            )),
178            connected_replica_count: metrics_registry.register(metric!(
179                name: "mz_compute_controller_connected_replica_count",
180                help: "The number of replicas successfully connected to the compute controller.",
181                var_labels: ["instance_id"],
182            )),
183            replica_connects_total: metrics_registry.register(metric!(
184                name: "mz_compute_controller_replica_connects_total",
185                help: "The total number of replica (re-)connections made by the compute controller.",
186                var_labels: ["instance_id", "replica_id"],
187            )),
188            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
189                name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
190                help: "The total time the compute controller spent waiting for replica (re-)connection.",
191                var_labels: ["instance_id", "replica_id"],
192            )),
193
194            shared,
195        }
196    }
197
198    /// Return an object suitable for tracking metrics for the given compute instance.
199    pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
200        let labels = vec![instance_id.to_string()];
201        let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
202        let collection_count = self
203            .collection_count
204            .get_delete_on_drop_metric(labels.clone());
205        let collection_unscheduled_count = self
206            .collection_unscheduled_count
207            .get_delete_on_drop_metric(labels.clone());
208        let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
209        let subscribe_count = self
210            .subscribe_count
211            .get_delete_on_drop_metric(labels.clone());
212        let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
213        let history_command_count = CommandMetrics::build(|typ| {
214            let labels = labels.iter().cloned().chain([typ.into()]).collect();
215            self.history_command_count.get_delete_on_drop_metric(labels)
216        });
217        let history_dataflow_count = self
218            .history_dataflow_count
219            .get_delete_on_drop_metric(labels.clone());
220        let peeks_total = PeekMetrics::build(|typ| {
221            let labels = labels.iter().cloned().chain([typ.into()]).collect();
222            self.peeks_total.get_delete_on_drop_metric(labels)
223        });
224        let peek_duration_seconds = PeekMetrics::build(|typ| {
225            let labels = labels.iter().cloned().chain([typ.into()]).collect();
226            self.peek_duration_seconds.get_delete_on_drop_metric(labels)
227        });
228        let response_send_count = self
229            .response_send_count
230            .get_delete_on_drop_metric(labels.clone());
231        let response_recv_count = self
232            .response_recv_count
233            .get_delete_on_drop_metric(labels.clone());
234        let connected_replica_count = self
235            .connected_replica_count
236            .get_delete_on_drop_metric(labels);
237
238        InstanceMetrics {
239            instance_id,
240            metrics: self.clone(),
241            replica_count,
242            collection_count,
243            collection_unscheduled_count,
244            copy_to_count,
245            peek_count,
246            subscribe_count,
247            history_command_count,
248            history_dataflow_count,
249            peeks_total,
250            peek_duration_seconds,
251            response_send_count,
252            response_recv_count,
253            connected_replica_count,
254        }
255    }
256}
257
258/// Per-instance metrics
259#[derive(Debug)]
260pub struct InstanceMetrics {
261    instance_id: ComputeInstanceId,
262    metrics: ComputeControllerMetrics,
263
264    /// Gauge tracking the number of replicas.
265    pub replica_count: UIntGauge,
266    /// Gauge tracking the number of installed compute collections.
267    pub collection_count: UIntGauge,
268    /// Gauge tracking the number of installed but unscheduled compute collections.
269    pub collection_unscheduled_count: UIntGauge,
270    /// Gauge tracking the number of pending peeks.
271    pub peek_count: UIntGauge,
272    /// Gauge tracking the number of active subscribes.
273    pub subscribe_count: UIntGauge,
274    /// Gauge tracking the number of active COPY TO queries.
275    pub copy_to_count: UIntGauge,
276    /// Gauges tracking the number of commands in the command history.
277    pub history_command_count: CommandMetrics<UIntGauge>,
278    /// Gauge tracking the number of dataflows in the command history.
279    pub history_dataflow_count: UIntGauge,
280    /// Counter tracking the total number of peeks served.
281    pub peeks_total: PeekMetrics<IntCounter>,
282    /// Histogram tracking peek durations.
283    pub peek_duration_seconds: PeekMetrics<Histogram>,
284    /// Counter tracking the number of sends on the compute response queue.
285    pub response_send_count: IntCounter,
286    /// Counter tracking the number of receives on the compute response queue.
287    pub response_recv_count: IntCounter,
288    /// Gauge tracking the number of connected replicas.
289    pub connected_replica_count: UIntGauge,
290}
291
292impl InstanceMetrics {
293    /// TODO(database-issues#7533): Add documentation.
294    pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
295        let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
296        let extended_labels = |extra: &str| {
297            labels
298                .iter()
299                .cloned()
300                .chain([extra.into()])
301                .collect::<Vec<_>>()
302        };
303
304        let commands_total = CommandMetrics::build(|typ| {
305            let labels = extended_labels(typ);
306            self.metrics
307                .commands_total
308                .get_delete_on_drop_metric(labels)
309        });
310        let responses_total = ResponseMetrics::build(|typ| {
311            let labels = extended_labels(typ);
312            self.metrics
313                .responses_total
314                .get_delete_on_drop_metric(labels)
315        });
316
317        let command_message_bytes_total = self
318            .metrics
319            .command_message_bytes_total
320            .get_delete_on_drop_metric(labels.clone());
321        let response_message_bytes_total = self
322            .metrics
323            .response_message_bytes_total
324            .get_delete_on_drop_metric(labels.clone());
325
326        let command_queue_size = self
327            .metrics
328            .command_queue_size
329            .get_delete_on_drop_metric(labels.clone());
330        let hydration_queue_size = self
331            .metrics
332            .hydration_queue_size
333            .get_delete_on_drop_metric(labels.clone());
334
335        let replica_connects_total = self
336            .metrics
337            .replica_connects_total
338            .get_delete_on_drop_metric(labels.clone());
339        let replica_connect_wait_time_seconds_total = self
340            .metrics
341            .replica_connect_wait_time_seconds_total
342            .get_delete_on_drop_metric(labels);
343
344        ReplicaMetrics {
345            instance_id: self.instance_id,
346            replica_id,
347            metrics: self.metrics.clone(),
348            inner: Arc::new(ReplicaMetricsInner {
349                commands_total,
350                command_message_bytes_total,
351                responses_total,
352                response_message_bytes_total,
353                command_queue_size,
354                hydration_queue_size,
355                replica_connects_total,
356                replica_connect_wait_time_seconds_total,
357            }),
358        }
359    }
360
361    /// TODO(database-issues#7533): Add documentation.
362    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
363        let labels = vec![self.instance_id.to_string()];
364        let command_counts = CommandMetrics::build(|typ| {
365            let labels = labels.iter().cloned().chain([typ.into()]).collect();
366            self.metrics
367                .history_command_count
368                .get_delete_on_drop_metric(labels)
369        });
370        let dataflow_count = self
371            .metrics
372            .history_dataflow_count
373            .get_delete_on_drop_metric(labels);
374
375        HistoryMetrics {
376            command_counts,
377            dataflow_count,
378        }
379    }
380
381    /// Reflect the given peek response in the metrics.
382    pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
383        self.peeks_total.for_peek_response(response).inc();
384        self.peek_duration_seconds
385            .for_peek_response(response)
386            .observe(duration.as_secs_f64());
387    }
388}
389
390/// Per-replica metrics.
391#[derive(Debug, Clone)]
392pub struct ReplicaMetrics {
393    instance_id: ComputeInstanceId,
394    replica_id: ReplicaId,
395    metrics: ComputeControllerMetrics,
396
397    /// Metrics counters, wrapped in an `Arc` to be shareable between threads.
398    pub inner: Arc<ReplicaMetricsInner>,
399}
400
401/// Per-replica metrics counters.
402#[derive(Debug)]
403pub struct ReplicaMetricsInner {
404    commands_total: CommandMetrics<IntCounter>,
405    command_message_bytes_total: IntCounter,
406    responses_total: ResponseMetrics<IntCounter>,
407    response_message_bytes_total: IntCounter,
408
409    /// Gauge tracking the size of the compute command queue.
410    pub command_queue_size: UIntGauge,
411    /// Gauge tracking the size of the hydration queue.
412    pub hydration_queue_size: UIntGauge,
413
414    /// Counter tracking the total number of (re-)connects.
415    replica_connects_total: IntCounter,
416    /// Counter tracking the total time spent waiting for (re-)connects.
417    replica_connect_wait_time_seconds_total: Counter,
418}
419
420impl ReplicaMetrics {
421    pub(crate) fn for_collection(
422        &self,
423        collection_id: GlobalId,
424    ) -> Option<ReplicaCollectionMetrics> {
425        // In an effort to reduce the cardinality of timeseries created, we collect metrics only
426        // for non-transient collections. This is roughly equivalent to "long-lived" collections,
427        // with the exception of subscribes which may or may not be long-lived. We might want to
428        // change this policy in the future to track subscribes as well.
429        if collection_id.is_transient() {
430            return None;
431        }
432
433        let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
434            collection_id.to_string(),
435            Some(self.instance_id.to_string()),
436            Some(self.replica_id.to_string()),
437        );
438
439        Some(ReplicaCollectionMetrics { wallclock_lag })
440    }
441
442    /// Observe a successful replica connection.
443    pub(crate) fn observe_connect(&self) {
444        self.inner.replica_connects_total.inc();
445    }
446
447    /// Observe time spent waiting for a replica connection.
448    pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
449        self.inner
450            .replica_connect_wait_time_seconds_total
451            .inc_by(wait_time.as_secs_f64());
452    }
453}
454
455impl transport::Metrics<ComputeCommand, ComputeResponse> for ReplicaMetrics {
456    fn bytes_sent(&mut self, len: usize) {
457        self.inner
458            .command_message_bytes_total
459            .inc_by(u64::cast_from(len));
460    }
461
462    fn bytes_received(&mut self, len: usize) {
463        self.inner
464            .response_message_bytes_total
465            .inc_by(u64::cast_from(len));
466    }
467
468    fn message_sent(&mut self, msg: &ComputeCommand) {
469        self.inner.commands_total.for_command(msg).inc();
470    }
471
472    fn message_received(&mut self, msg: &ComputeResponse) {
473        self.inner.responses_total.for_response(msg).inc();
474    }
475}
476
477/// Per-replica-and-collection metrics.
478#[derive(Debug)]
479pub(crate) struct ReplicaCollectionMetrics {
480    /// Metrics tracking dataflow wallclock lag.
481    pub wallclock_lag: WallclockLagMetrics,
482}
483
484/// Metrics keyed by `ComputeCommand` type.
485#[derive(Clone, Debug)]
486pub struct CommandMetrics<M> {
487    /// Metrics for `Hello`.
488    pub hello: M,
489    /// Metrics for `CreateInstance`.
490    pub create_instance: M,
491    /// Metrics for `CreateDataflow`.
492    pub create_dataflow: M,
493    /// Metrics for `Schedule`.
494    pub schedule: M,
495    /// Metrics for `AllowCompaction`.
496    pub allow_compaction: M,
497    /// Metrics for `Peek`.
498    pub peek: M,
499    /// Metrics for `CancelPeek`.
500    pub cancel_peek: M,
501    /// Metrics for `InitializationComplete`.
502    pub initialization_complete: M,
503    /// Metrics for `UpdateConfiguration`.
504    pub update_configuration: M,
505    /// Metrics for `AllowWrites`.
506    pub allow_writes: M,
507}
508
509impl<M> CommandMetrics<M> {
510    /// TODO(database-issues#7533): Add documentation.
511    pub fn build<F>(build_metric: F) -> Self
512    where
513        F: Fn(&str) -> M,
514    {
515        Self {
516            hello: build_metric("hello"),
517            create_instance: build_metric("create_instance"),
518            create_dataflow: build_metric("create_dataflow"),
519            schedule: build_metric("schedule"),
520            allow_compaction: build_metric("allow_compaction"),
521            peek: build_metric("peek"),
522            cancel_peek: build_metric("cancel_peek"),
523            initialization_complete: build_metric("initialization_complete"),
524            update_configuration: build_metric("update_configuration"),
525            allow_writes: build_metric("allow_writes"),
526        }
527    }
528
529    fn for_all<F>(&self, f: F)
530    where
531        F: Fn(&M),
532    {
533        f(&self.hello);
534        f(&self.create_instance);
535        f(&self.initialization_complete);
536        f(&self.update_configuration);
537        f(&self.create_dataflow);
538        f(&self.schedule);
539        f(&self.allow_compaction);
540        f(&self.peek);
541        f(&self.cancel_peek);
542        f(&self.allow_writes);
543    }
544
545    /// TODO(database-issues#7533): Add documentation.
546    pub fn for_command(&self, command: &ComputeCommand) -> &M {
547        use ComputeCommand::*;
548
549        match command {
550            Hello { .. } => &self.hello,
551            CreateInstance(_) => &self.create_instance,
552            InitializationComplete => &self.initialization_complete,
553            UpdateConfiguration(_) => &self.update_configuration,
554            CreateDataflow(_) => &self.create_dataflow,
555            Schedule(_) => &self.schedule,
556            AllowCompaction { .. } => &self.allow_compaction,
557            Peek(_) => &self.peek,
558            CancelPeek { .. } => &self.cancel_peek,
559            AllowWrites { .. } => &self.allow_writes,
560        }
561    }
562}
563
564/// Metrics keyed by `ComputeResponse` type.
565#[derive(Debug)]
566struct ResponseMetrics<M> {
567    frontiers: M,
568    peek_response: M,
569    subscribe_response: M,
570    copy_to_response: M,
571    status: M,
572}
573
574impl<M> ResponseMetrics<M> {
575    fn build<F>(build_metric: F) -> Self
576    where
577        F: Fn(&str) -> M,
578    {
579        Self {
580            frontiers: build_metric("frontiers"),
581            peek_response: build_metric("peek_response"),
582            subscribe_response: build_metric("subscribe_response"),
583            copy_to_response: build_metric("copy_to_response"),
584            status: build_metric("status"),
585        }
586    }
587
588    fn for_response(&self, response: &ComputeResponse) -> &M {
589        use ComputeResponse::*;
590
591        match response {
592            Frontiers(..) => &self.frontiers,
593            PeekResponse(..) => &self.peek_response,
594            SubscribeResponse(..) => &self.subscribe_response,
595            CopyToResponse(..) => &self.copy_to_response,
596            Status(..) => &self.status,
597        }
598    }
599}
600
601/// Metrics tracked by the command history.
602#[derive(Debug)]
603pub struct HistoryMetrics<G> {
604    /// Metrics tracking command counts.
605    pub command_counts: CommandMetrics<G>,
606    /// Metric tracking the dataflow count.
607    pub dataflow_count: G,
608}
609
610impl<G> HistoryMetrics<G>
611where
612    G: Borrow<mz_ore::metrics::UIntGauge>,
613{
614    /// Reset all tracked counts to 0.
615    pub fn reset(&self) {
616        self.command_counts.for_all(|m| m.borrow().set(0));
617        self.dataflow_count.borrow().set(0);
618    }
619}
620
621/// Metrics for finished peeks, keyed by peek result.
622#[derive(Debug)]
623pub struct PeekMetrics<M> {
624    rows: M,
625    rows_stashed: M,
626    error: M,
627    canceled: M,
628}
629
630impl<M> PeekMetrics<M> {
631    fn build<F>(build_metric: F) -> Self
632    where
633        F: Fn(&str) -> M,
634    {
635        Self {
636            rows: build_metric("rows"),
637            rows_stashed: build_metric("rows_stashed"),
638            error: build_metric("error"),
639            canceled: build_metric("canceled"),
640        }
641    }
642
643    fn for_peek_response(&self, response: &PeekResponse) -> &M {
644        use PeekResponse::*;
645
646        match response {
647            Rows(_) => &self.rows,
648            Stashed(_) => &self.rows_stashed,
649            Error(_) => &self.error,
650            Canceled => &self.canceled,
651        }
652    }
653}