mz_compute_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the compute controller components
11
12use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24    IntCounterVec, MetricVecExt, MetricsRegistry,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::codec::StatsCollector;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::{ComputeCommand, ProtoComputeCommand};
32use crate::protocol::response::{PeekResponse, ProtoComputeResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39/// Compute controller metrics.
40#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42    // compute protocol
43    commands_total: IntCounterVec,
44    command_message_bytes_total: IntCounterVec,
45    responses_total: IntCounterVec,
46    response_message_bytes_total: IntCounterVec,
47
48    // controller state
49    replica_count: UIntGaugeVec,
50    collection_count: UIntGaugeVec,
51    collection_unscheduled_count: UIntGaugeVec,
52    peek_count: UIntGaugeVec,
53    subscribe_count: UIntGaugeVec,
54    copy_to_count: UIntGaugeVec,
55    command_queue_size: UIntGaugeVec,
56    response_send_count: IntCounterVec,
57    response_recv_count: IntCounterVec,
58    hydration_queue_size: UIntGaugeVec,
59
60    // command history
61    history_command_count: UIntGaugeVec,
62    history_dataflow_count: UIntGaugeVec,
63
64    // peeks
65    peeks_total: IntCounterVec,
66    peek_duration_seconds: HistogramVec,
67
68    // replica connections
69    connected_replica_count: UIntGaugeVec,
70    replica_connects_total: IntCounterVec,
71    replica_connect_wait_time_seconds_total: CounterVec,
72
73    /// Metrics shared with the storage controller.
74    shared: ControllerMetrics,
75}
76
77impl ComputeControllerMetrics {
78    /// Create a metrics instance registered into the given registry.
79    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
80        ComputeControllerMetrics {
81            commands_total: metrics_registry.register(metric!(
82                name: "mz_compute_commands_total",
83                help: "The total number of compute commands sent.",
84                var_labels: ["instance_id", "replica_id", "command_type"],
85            )),
86            command_message_bytes_total: metrics_registry.register(metric!(
87                name: "mz_compute_command_message_bytes_total",
88                help: "The total number of bytes sent in compute command messages.",
89                var_labels: ["instance_id", "replica_id", "command_type"],
90            )),
91            responses_total: metrics_registry.register(metric!(
92                name: "mz_compute_responses_total",
93                help: "The total number of compute responses sent.",
94                var_labels: ["instance_id", "replica_id", "response_type"],
95            )),
96            response_message_bytes_total: metrics_registry.register(metric!(
97                name: "mz_compute_response_message_bytes_total",
98                help: "The total number of bytes sent in compute response messages.",
99                var_labels: ["instance_id", "replica_id", "response_type"],
100            )),
101            replica_count: metrics_registry.register(metric!(
102                name: "mz_compute_controller_replica_count",
103                help: "The number of replicas.",
104                var_labels: ["instance_id"],
105            )),
106            collection_count: metrics_registry.register(metric!(
107                name: "mz_compute_controller_collection_count",
108                help: "The number of installed compute collections.",
109                var_labels: ["instance_id"],
110            )),
111            collection_unscheduled_count: metrics_registry.register(metric!(
112                name: "mz_compute_controller_collection_unscheduled_count",
113                help: "The number of installed but unscheduled compute collections.",
114                var_labels: ["instance_id"],
115            )),
116            peek_count: metrics_registry.register(metric!(
117                name: "mz_compute_controller_peek_count",
118                help: "The number of pending peeks.",
119                var_labels: ["instance_id"],
120            )),
121            subscribe_count: metrics_registry.register(metric!(
122                name: "mz_compute_controller_subscribe_count",
123                help: "The number of active subscribes.",
124                var_labels: ["instance_id"],
125            )),
126            copy_to_count: metrics_registry.register(metric!(
127                name: "mz_compute_controller_copy_to_count",
128                help: "The number of active copy tos.",
129                var_labels: ["instance_id"],
130            )),
131            command_queue_size: metrics_registry.register(metric!(
132                name: "mz_compute_controller_command_queue_size",
133                help: "The size of the compute command queue.",
134                var_labels: ["instance_id", "replica_id"],
135            )),
136            response_send_count: metrics_registry.register(metric!(
137                name: "mz_compute_controller_response_send_count",
138                help: "The number of sends on the compute response queue.",
139                var_labels: ["instance_id"],
140            )),
141            response_recv_count: metrics_registry.register(metric!(
142                name: "mz_compute_controller_response_recv_count",
143                help: "The number of receives on the compute response queue.",
144                var_labels: ["instance_id"],
145            )),
146            hydration_queue_size: metrics_registry.register(metric!(
147                name: "mz_compute_controller_hydration_queue_size",
148                help: "The size of the compute hydration queue.",
149                var_labels: ["instance_id", "replica_id"],
150            )),
151            history_command_count: metrics_registry.register(metric!(
152                name: "mz_compute_controller_history_command_count",
153                help: "The number of commands in the controller's command history.",
154                var_labels: ["instance_id", "command_type"],
155            )),
156            history_dataflow_count: metrics_registry.register(metric!(
157                name: "mz_compute_controller_history_dataflow_count",
158                help: "The number of dataflows in the controller's command history.",
159                var_labels: ["instance_id"],
160            )),
161            peeks_total: metrics_registry.register(metric!(
162                name: "mz_compute_peeks_total",
163                help: "The total number of peeks served.",
164                var_labels: ["instance_id", "result"],
165            )),
166            peek_duration_seconds: metrics_registry.register(metric!(
167                name: "mz_compute_peek_duration_seconds",
168                help: "A histogram of peek durations since restart.",
169                var_labels: ["instance_id", "result"],
170                buckets: histogram_seconds_buckets(0.000_500, 32.),
171            )),
172            connected_replica_count: metrics_registry.register(metric!(
173                name: "mz_compute_controller_connected_replica_count",
174                help: "The number of replicas successfully connected to the compute controller.",
175                var_labels: ["instance_id"],
176            )),
177            replica_connects_total: metrics_registry.register(metric!(
178                name: "mz_compute_controller_replica_connects_total",
179                help: "The total number of replica (re-)connections made by the compute controller.",
180                var_labels: ["instance_id", "replica_id"],
181            )),
182            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
183                name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
184                help: "The total time the compute controller spent waiting for replica (re-)connection.",
185                var_labels: ["instance_id", "replica_id"],
186            )),
187
188            shared,
189        }
190    }
191
192    /// Return an object suitable for tracking metrics for the given compute instance.
193    pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
194        let labels = vec![instance_id.to_string()];
195        let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
196        let collection_count = self
197            .collection_count
198            .get_delete_on_drop_metric(labels.clone());
199        let collection_unscheduled_count = self
200            .collection_unscheduled_count
201            .get_delete_on_drop_metric(labels.clone());
202        let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
203        let subscribe_count = self
204            .subscribe_count
205            .get_delete_on_drop_metric(labels.clone());
206        let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
207        let history_command_count = CommandMetrics::build(|typ| {
208            let labels = labels.iter().cloned().chain([typ.into()]).collect();
209            self.history_command_count.get_delete_on_drop_metric(labels)
210        });
211        let history_dataflow_count = self
212            .history_dataflow_count
213            .get_delete_on_drop_metric(labels.clone());
214        let peeks_total = PeekMetrics::build(|typ| {
215            let labels = labels.iter().cloned().chain([typ.into()]).collect();
216            self.peeks_total.get_delete_on_drop_metric(labels)
217        });
218        let peek_duration_seconds = PeekMetrics::build(|typ| {
219            let labels = labels.iter().cloned().chain([typ.into()]).collect();
220            self.peek_duration_seconds.get_delete_on_drop_metric(labels)
221        });
222        let response_send_count = self
223            .response_send_count
224            .get_delete_on_drop_metric(labels.clone());
225        let response_recv_count = self
226            .response_recv_count
227            .get_delete_on_drop_metric(labels.clone());
228        let connected_replica_count = self
229            .connected_replica_count
230            .get_delete_on_drop_metric(labels);
231
232        InstanceMetrics {
233            instance_id,
234            metrics: self.clone(),
235            replica_count,
236            collection_count,
237            collection_unscheduled_count,
238            copy_to_count,
239            peek_count,
240            subscribe_count,
241            history_command_count,
242            history_dataflow_count,
243            peeks_total,
244            peek_duration_seconds,
245            response_send_count,
246            response_recv_count,
247            connected_replica_count,
248        }
249    }
250}
251
252/// Per-instance metrics
253#[derive(Debug)]
254pub struct InstanceMetrics {
255    instance_id: ComputeInstanceId,
256    metrics: ComputeControllerMetrics,
257
258    /// Gauge tracking the number of replicas.
259    pub replica_count: UIntGauge,
260    /// Gauge tracking the number of installed compute collections.
261    pub collection_count: UIntGauge,
262    /// Gauge tracking the number of installed but unscheduled compute collections.
263    pub collection_unscheduled_count: UIntGauge,
264    /// Gauge tracking the number of pending peeks.
265    pub peek_count: UIntGauge,
266    /// Gauge tracking the number of active subscribes.
267    pub subscribe_count: UIntGauge,
268    /// Gauge tracking the number of active COPY TO queries.
269    pub copy_to_count: UIntGauge,
270    /// Gauges tracking the number of commands in the command history.
271    pub history_command_count: CommandMetrics<UIntGauge>,
272    /// Gauge tracking the number of dataflows in the command history.
273    pub history_dataflow_count: UIntGauge,
274    /// Counter tracking the total number of peeks served.
275    pub peeks_total: PeekMetrics<IntCounter>,
276    /// Histogram tracking peek durations.
277    pub peek_duration_seconds: PeekMetrics<Histogram>,
278    /// Counter tracking the number of sends on the compute response queue.
279    pub response_send_count: IntCounter,
280    /// Counter tracking the number of receives on the compute response queue.
281    pub response_recv_count: IntCounter,
282    /// Gauge tracking the number of connected replicas.
283    pub connected_replica_count: UIntGauge,
284}
285
286impl InstanceMetrics {
287    /// TODO(database-issues#7533): Add documentation.
288    pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
289        let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
290        let extended_labels = |extra: &str| {
291            labels
292                .iter()
293                .cloned()
294                .chain([extra.into()])
295                .collect::<Vec<_>>()
296        };
297
298        let commands_total = CommandMetrics::build(|typ| {
299            let labels = extended_labels(typ);
300            self.metrics
301                .commands_total
302                .get_delete_on_drop_metric(labels)
303        });
304        let command_message_bytes_total = CommandMetrics::build(|typ| {
305            let labels = extended_labels(typ);
306            self.metrics
307                .command_message_bytes_total
308                .get_delete_on_drop_metric(labels)
309        });
310        let responses_total = ResponseMetrics::build(|typ| {
311            let labels = extended_labels(typ);
312            self.metrics
313                .responses_total
314                .get_delete_on_drop_metric(labels)
315        });
316        let response_message_bytes_total = ResponseMetrics::build(|typ| {
317            let labels = extended_labels(typ);
318            self.metrics
319                .response_message_bytes_total
320                .get_delete_on_drop_metric(labels)
321        });
322
323        let command_queue_size = self
324            .metrics
325            .command_queue_size
326            .get_delete_on_drop_metric(labels.clone());
327        let hydration_queue_size = self
328            .metrics
329            .hydration_queue_size
330            .get_delete_on_drop_metric(labels.clone());
331
332        let replica_connects_total = self
333            .metrics
334            .replica_connects_total
335            .get_delete_on_drop_metric(labels.clone());
336        let replica_connect_wait_time_seconds_total = self
337            .metrics
338            .replica_connect_wait_time_seconds_total
339            .get_delete_on_drop_metric(labels);
340
341        ReplicaMetrics {
342            instance_id: self.instance_id,
343            replica_id,
344            metrics: self.metrics.clone(),
345            inner: Arc::new(ReplicaMetricsInner {
346                commands_total,
347                command_message_bytes_total,
348                responses_total,
349                response_message_bytes_total,
350                command_queue_size,
351                hydration_queue_size,
352                replica_connects_total,
353                replica_connect_wait_time_seconds_total,
354            }),
355        }
356    }
357
358    /// TODO(database-issues#7533): Add documentation.
359    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
360        let labels = vec![self.instance_id.to_string()];
361        let command_counts = CommandMetrics::build(|typ| {
362            let labels = labels.iter().cloned().chain([typ.into()]).collect();
363            self.metrics
364                .history_command_count
365                .get_delete_on_drop_metric(labels)
366        });
367        let dataflow_count = self
368            .metrics
369            .history_dataflow_count
370            .get_delete_on_drop_metric(labels);
371
372        HistoryMetrics {
373            command_counts,
374            dataflow_count,
375        }
376    }
377
378    /// Reflect the given peek response in the metrics.
379    pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
380        self.peeks_total.for_peek_response(response).inc();
381        self.peek_duration_seconds
382            .for_peek_response(response)
383            .observe(duration.as_secs_f64());
384    }
385}
386
387/// Per-replica metrics.
388#[derive(Debug, Clone)]
389pub struct ReplicaMetrics {
390    instance_id: ComputeInstanceId,
391    replica_id: ReplicaId,
392    metrics: ComputeControllerMetrics,
393
394    /// Metrics counters, wrapped in an `Arc` to be shareable between threads.
395    pub inner: Arc<ReplicaMetricsInner>,
396}
397
398/// Per-replica metrics counters.
399#[derive(Debug)]
400pub struct ReplicaMetricsInner {
401    commands_total: CommandMetrics<IntCounter>,
402    command_message_bytes_total: CommandMetrics<IntCounter>,
403    responses_total: ResponseMetrics<IntCounter>,
404    response_message_bytes_total: ResponseMetrics<IntCounter>,
405
406    /// Gauge tracking the size of the compute command queue.
407    pub command_queue_size: UIntGauge,
408    /// Gauge tracking the size of the hydration queue.
409    pub hydration_queue_size: UIntGauge,
410
411    /// Counter tracking the total number of (re-)connects.
412    replica_connects_total: IntCounter,
413    /// Counter tracking the total time spent waiting for (re-)connects.
414    replica_connect_wait_time_seconds_total: Counter,
415}
416
417impl ReplicaMetrics {
418    pub(crate) fn for_collection(
419        &self,
420        collection_id: GlobalId,
421    ) -> Option<ReplicaCollectionMetrics> {
422        // In an effort to reduce the cardinality of timeseries created, we collect metrics only
423        // for non-transient collections. This is roughly equivalent to "long-lived" collections,
424        // with the exception of subscribes which may or may not be long-lived. We might want to
425        // change this policy in the future to track subscribes as well.
426        if collection_id.is_transient() {
427            return None;
428        }
429
430        let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
431            collection_id.to_string(),
432            Some(self.instance_id.to_string()),
433            Some(self.replica_id.to_string()),
434        );
435
436        Some(ReplicaCollectionMetrics { wallclock_lag })
437    }
438
439    /// Observe a successful replica connection.
440    pub(crate) fn observe_connect(&self) {
441        self.inner.replica_connects_total.inc();
442    }
443
444    /// Observe time spent waiting for a replica connection.
445    pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
446        self.inner
447            .replica_connect_wait_time_seconds_total
448            .inc_by(wait_time.as_secs_f64());
449    }
450}
451
452/// Make [`ReplicaMetrics`] pluggable into the gRPC connection.
453impl StatsCollector<ProtoComputeCommand, ProtoComputeResponse> for ReplicaMetrics {
454    fn send_event(&self, item: &ProtoComputeCommand, size: usize) {
455        self.inner.commands_total.for_proto_command(item).inc();
456        self.inner
457            .command_message_bytes_total
458            .for_proto_command(item)
459            .inc_by(u64::cast_from(size));
460    }
461
462    fn receive_event(&self, item: &ProtoComputeResponse, size: usize) {
463        self.inner.responses_total.for_proto_response(item).inc();
464        self.inner
465            .response_message_bytes_total
466            .for_proto_response(item)
467            .inc_by(u64::cast_from(size));
468    }
469}
470
471/// Per-replica-and-collection metrics.
472#[derive(Debug)]
473pub(crate) struct ReplicaCollectionMetrics {
474    /// Metrics tracking dataflow wallclock lag.
475    pub wallclock_lag: WallclockLagMetrics,
476}
477
478/// Metrics keyed by `ComputeCommand` type.
479#[derive(Clone, Debug)]
480pub struct CommandMetrics<M> {
481    /// Metrics for `CreateTimely`.
482    pub create_timely: M,
483    /// Metrics for `CreateInstance`.
484    pub create_instance: M,
485    /// Metrics for `CreateDataflow`.
486    pub create_dataflow: M,
487    /// Metrics for `Schedule`.
488    pub schedule: M,
489    /// Metrics for `AllowCompaction`.
490    pub allow_compaction: M,
491    /// Metrics for `Peek`.
492    pub peek: M,
493    /// Metrics for `CancelPeek`.
494    pub cancel_peek: M,
495    /// Metrics for `InitializationComplete`.
496    pub initialization_complete: M,
497    /// Metrics for `UpdateConfiguration`.
498    pub update_configuration: M,
499    /// Metrics for `AllowWrites`.
500    pub allow_writes: M,
501}
502
503impl<M> CommandMetrics<M> {
504    /// TODO(database-issues#7533): Add documentation.
505    pub fn build<F>(build_metric: F) -> Self
506    where
507        F: Fn(&str) -> M,
508    {
509        Self {
510            create_timely: build_metric("create_timely"),
511            create_instance: build_metric("create_instance"),
512            create_dataflow: build_metric("create_dataflow"),
513            schedule: build_metric("schedule"),
514            allow_compaction: build_metric("allow_compaction"),
515            peek: build_metric("peek"),
516            cancel_peek: build_metric("cancel_peek"),
517            initialization_complete: build_metric("initialization_complete"),
518            update_configuration: build_metric("update_configuration"),
519            allow_writes: build_metric("allow_writes"),
520        }
521    }
522
523    fn for_all<F>(&self, f: F)
524    where
525        F: Fn(&M),
526    {
527        f(&self.create_timely);
528        f(&self.create_instance);
529        f(&self.initialization_complete);
530        f(&self.update_configuration);
531        f(&self.create_dataflow);
532        f(&self.schedule);
533        f(&self.allow_compaction);
534        f(&self.peek);
535        f(&self.cancel_peek);
536    }
537
538    /// TODO(database-issues#7533): Add documentation.
539    pub fn for_command<T>(&self, command: &ComputeCommand<T>) -> &M {
540        use ComputeCommand::*;
541
542        match command {
543            CreateTimely { .. } => &self.create_timely,
544            CreateInstance(_) => &self.create_instance,
545            InitializationComplete => &self.initialization_complete,
546            UpdateConfiguration(_) => &self.update_configuration,
547            CreateDataflow(_) => &self.create_dataflow,
548            Schedule(_) => &self.schedule,
549            AllowCompaction { .. } => &self.allow_compaction,
550            Peek(_) => &self.peek,
551            CancelPeek { .. } => &self.cancel_peek,
552            AllowWrites { .. } => &self.allow_writes,
553        }
554    }
555
556    fn for_proto_command(&self, proto: &ProtoComputeCommand) -> &M {
557        use crate::protocol::command::proto_compute_command::Kind::*;
558
559        match proto.kind.as_ref().unwrap() {
560            CreateTimely(_) => &self.create_timely,
561            CreateInstance(_) => &self.create_instance,
562            CreateDataflow(_) => &self.create_dataflow,
563            Schedule(_) => &self.schedule,
564            AllowCompaction(_) => &self.allow_compaction,
565            Peek(_) => &self.peek,
566            CancelPeek(_) => &self.cancel_peek,
567            InitializationComplete(_) => &self.initialization_complete,
568            UpdateConfiguration(_) => &self.update_configuration,
569            AllowWrites(_) => &self.allow_writes,
570        }
571    }
572}
573
574/// Metrics keyed by `ComputeResponse` type.
575#[derive(Debug)]
576struct ResponseMetrics<M> {
577    frontiers: M,
578    peek_response: M,
579    subscribe_response: M,
580    copy_to_response: M,
581    status: M,
582}
583
584impl<M> ResponseMetrics<M> {
585    fn build<F>(build_metric: F) -> Self
586    where
587        F: Fn(&str) -> M,
588    {
589        Self {
590            frontiers: build_metric("frontiers"),
591            peek_response: build_metric("peek_response"),
592            subscribe_response: build_metric("subscribe_response"),
593            copy_to_response: build_metric("copy_to_response"),
594            status: build_metric("status"),
595        }
596    }
597
598    fn for_proto_response(&self, proto: &ProtoComputeResponse) -> &M {
599        use crate::protocol::response::proto_compute_response::Kind::*;
600
601        match proto.kind.as_ref().unwrap() {
602            Frontiers(_) => &self.frontiers,
603            PeekResponse(_) => &self.peek_response,
604            SubscribeResponse(_) => &self.subscribe_response,
605            CopyToResponse(_) => &self.copy_to_response,
606            Status(_) => &self.status,
607        }
608    }
609}
610
611/// Metrics tracked by the command history.
612#[derive(Debug)]
613pub struct HistoryMetrics<G> {
614    /// Metrics tracking command counts.
615    pub command_counts: CommandMetrics<G>,
616    /// Metric tracking the dataflow count.
617    pub dataflow_count: G,
618}
619
620impl<G> HistoryMetrics<G>
621where
622    G: Borrow<mz_ore::metrics::UIntGauge>,
623{
624    /// Reset all tracked counts to 0.
625    pub fn reset(&self) {
626        self.command_counts.for_all(|m| m.borrow().set(0));
627        self.dataflow_count.borrow().set(0);
628    }
629}
630
631/// Metrics for finished peeks, keyed by peek result.
632#[derive(Debug)]
633pub struct PeekMetrics<M> {
634    rows: M,
635    error: M,
636    canceled: M,
637}
638
639impl<M> PeekMetrics<M> {
640    fn build<F>(build_metric: F) -> Self
641    where
642        F: Fn(&str) -> M,
643    {
644        Self {
645            rows: build_metric("rows"),
646            error: build_metric("error"),
647            canceled: build_metric("canceled"),
648        }
649    }
650
651    fn for_peek_response(&self, response: &PeekResponse) -> &M {
652        use PeekResponse::*;
653
654        match response {
655            Rows(_) => &self.rows,
656            Error(_) => &self.error,
657            Canceled => &self.canceled,
658        }
659    }
660}