mz_storage/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for all things storage.
11//!
12//! The structure of this module is designed to make adding new metrics as easy as possible. The
13//! structure and naming conventions are as follows:
14//!
15//! Metrics for X end up in the `x.rs` submodule, unless X fits into one of the existing
16//! submodules. The struct `XMetricsDefs` defines the `CounterVec/GaugeVec/etc`'s that must be
17//! registered with the `MetricsRegistry` to create new metrics. `XMetricsDefs` should be a
18//! sub-field of `StorageMetrics` (or recursively a sub-field). `XMetricsDefs` has a
19//! `register_with` function to create it using a `MetricsRegistry`.
20//!
21//! `XMetrics` contains the actual gauges/counters/etc that are created using `XMetricsDefs`.
22//! Typically these are created with `new` functions that takes a `&XMetricsDefs`, a `GlobalId`,
23//! and a worker id, but sometimes more complex schemes are used, for metrics that are globally
24//! shared, or have some other shape to their labels.
25//!
26//! `StorageMetrics` is the main entry-point to this module, and for convenience, typically
27//! provides a `get_x_metrics` to obtain an `XMetrics` struct. This is to prevent users from
28//! needing to interact with metrics _definitions_ into the code that actually bumps those
29//! metrics.
30
31use std::sync::Arc;
32
33use mz_ore::metrics::MetricsRegistry;
34use mz_repr::GlobalId;
35
36use crate::statistics::{SinkStatisticsMetricDefs, SourceStatisticsMetricDefs};
37use mz_storage_operators::metrics::BackpressureMetrics;
38
39pub mod decode;
40pub mod sink;
41pub mod source;
42pub mod upsert;
43
44/// A top-level struct holding all various _definitions_ of all metrics
45/// use by the `mz-storage` crate.
46///
47/// Created by registering it with a `MetricsRegistry`, it also provides helpers
48/// to obtain various _instantiated_ time-series, either per-worker, shared globally,
49/// or some more specific labeling scheme.
50///
51/// This struct can be cloned, and the various definitions are shared.
52#[derive(Clone, Debug)]
53pub struct StorageMetrics {
54    pub(crate) source_defs: source::SourceMetricDefs,
55    pub(crate) decode_defs: decode::DecodeMetricDefs,
56    pub(crate) upsert_defs: upsert::UpsertMetricDefs,
57    pub(crate) upsert_backpressure_defs: upsert::UpsertBackpressureMetricDefs,
58    pub(crate) sink_defs: sink::SinkMetricDefs,
59
60    // Defined in the `statistics` module, as they are kept in sync with
61    // user-facing data.
62    pub(crate) source_statistics: SourceStatisticsMetricDefs,
63    pub(crate) sink_statistics: SinkStatisticsMetricDefs,
64}
65
66impl StorageMetrics {
67    /// Register all metrics with the `MetricsRegistry`.
68    pub fn register_with(registry: &MetricsRegistry) -> Self {
69        Self {
70            source_defs: source::SourceMetricDefs::register_with(registry),
71            decode_defs: decode::DecodeMetricDefs::register_with(registry),
72            upsert_defs: upsert::UpsertMetricDefs::register_with(registry),
73            upsert_backpressure_defs: upsert::UpsertBackpressureMetricDefs::register_with(registry),
74            sink_defs: sink::SinkMetricDefs::register_with(registry),
75            source_statistics: SourceStatisticsMetricDefs::register_with(registry),
76            sink_statistics: SinkStatisticsMetricDefs::register_with(registry),
77        }
78    }
79
80    /// Get a `BackpressureMetrics` for the given id and worker id.
81    pub(crate) fn get_backpressure_metrics(
82        &self,
83        id: GlobalId,
84        index: usize,
85    ) -> BackpressureMetrics {
86        BackpressureMetrics {
87            emitted_bytes: Arc::new(
88                self.upsert_backpressure_defs
89                    .emitted_bytes
90                    .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
91            ),
92            last_backpressured_bytes: Arc::new(
93                self.upsert_backpressure_defs
94                    .last_backpressured_bytes
95                    .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
96            ),
97            retired_bytes: Arc::new(
98                self.upsert_backpressure_defs
99                    .retired_bytes
100                    .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
101            ),
102        }
103    }
104
105    /// Get an `UpsertMetrics` for the given id and worker id (and optional `BackpressureMetrics`).
106    pub(crate) fn get_upsert_metrics(
107        &self,
108        id: GlobalId,
109        worker_id: usize,
110        backpressure_metrics: Option<BackpressureMetrics>,
111    ) -> upsert::UpsertMetrics {
112        upsert::UpsertMetrics::new(&self.upsert_defs, id, worker_id, backpressure_metrics)
113    }
114
115    /// Get a `SourcePersistSinkMetrics` for the given configuration.
116    pub(crate) fn get_source_persist_sink_metrics(
117        &self,
118        export_id: GlobalId,
119        primary_source_id: GlobalId,
120        worker_id: usize,
121        data_shard: &mz_persist_client::ShardId,
122    ) -> source::SourcePersistSinkMetrics {
123        source::SourcePersistSinkMetrics::new(
124            &self.source_defs.source_defs,
125            export_id,
126            primary_source_id,
127            worker_id,
128            data_shard,
129        )
130    }
131
132    /// Get a `SourceMetrics` for the given id and worker id.
133    pub(crate) fn get_source_metrics(
134        &self,
135        id: GlobalId,
136        worker_id: usize,
137    ) -> source::SourceMetrics {
138        source::SourceMetrics::new(&self.source_defs.source_defs, id, worker_id)
139    }
140
141    /// Get a `PgMetrics` for the given id.
142    pub(crate) fn get_postgres_source_metrics(
143        &self,
144        id: GlobalId,
145    ) -> source::postgres::PgSourceMetrics {
146        source::postgres::PgSourceMetrics::new(&self.source_defs.postgres_defs, id)
147    }
148
149    /// Get a `MySqlSourceMetrics` for the given id.
150    pub(crate) fn get_mysql_source_metrics(
151        &self,
152        id: GlobalId,
153    ) -> source::mysql::MySqlSourceMetrics {
154        source::mysql::MySqlSourceMetrics::new(&self.source_defs.mysql_defs, id)
155    }
156
157    /// Get an `OffsetCommitMetrics` for the given id.
158    pub(crate) fn get_offset_commit_metrics(&self, id: GlobalId) -> source::OffsetCommitMetrics {
159        source::OffsetCommitMetrics::new(&self.source_defs.source_defs, id)
160    }
161
162    /// Get an `KafkaSourceMetrics` for the given configuration.
163    pub(crate) fn get_kafka_source_metrics(
164        &self,
165        ids: Vec<i32>,
166        topic: String,
167        source_id: GlobalId,
168    ) -> source::kafka::KafkaSourceMetrics {
169        source::kafka::KafkaSourceMetrics::new(
170            &self.source_defs.kafka_source_defs,
171            ids,
172            topic,
173            source_id,
174        )
175    }
176
177    /// Get an `KafkaSinkMetrics` for the given configuration.
178    pub(crate) fn get_kafka_sink_metrics(
179        &self,
180        sink_id: GlobalId,
181    ) -> sink::kafka::KafkaSinkMetrics {
182        sink::kafka::KafkaSinkMetrics::new(&self.sink_defs.kafka_defs, sink_id)
183    }
184}