mz_storage/metrics/
source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Base" metrics used by all dataflow sources.
11//!
12//! We label metrics by the concrete source that they get emitted from (which makes these metrics
13//! in-eligible for ingestion by third parties), so that means we have to register the metric
14//! vectors to the registry once, and then generate concrete instantiations of them for the
15//! appropriate source.
16
17use mz_ore::metric;
18use mz_ore::metrics::{
19    DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec,
20    MetricsRegistry, UIntGaugeVec,
21};
22use mz_repr::GlobalId;
23use prometheus::core::{AtomicI64, AtomicU64};
24
25pub mod kafka;
26pub mod mysql;
27pub mod postgres;
28pub mod sql_server;
29
30/// Definitions for general metrics about sources that are not specific to the source type.
31///
32/// Registers metrics for
33/// `SourceMetrics`, `OffsetCommitMetrics`, and `SourcePersistSinkMetrics`.
34#[derive(Clone, Debug)]
35pub(crate) struct GeneralSourceMetricDefs {
36    // Source metrics
37    pub(crate) resume_upper: IntGaugeVec,
38    pub(crate) commit_upper_ready_times: UIntGaugeVec,
39    pub(crate) commit_upper_accepted_times: UIntGaugeVec,
40
41    // OffsetCommitMetrics
42    pub(crate) offset_commit_failures: IntCounterVec,
43
44    // `persist_sink` metrics
45    /// A timestamp gauge representing forward progress
46    /// in the data shard.
47    pub(crate) progress: IntGaugeVec,
48    pub(crate) row_inserts: IntCounterVec,
49    pub(crate) row_retractions: IntCounterVec,
50    pub(crate) error_inserts: IntCounterVec,
51    pub(crate) error_retractions: IntCounterVec,
52    pub(crate) persist_sink_processed_batches: IntCounterVec,
53}
54
55impl GeneralSourceMetricDefs {
56    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
57        Self {
58            // TODO(guswynn): some of these metrics are not clear when subsources are involved, and
59            // should be fixed
60            resume_upper: registry.register(metric!(
61                name: "mz_resume_upper",
62                // TODO(guswynn): should this also track the resumption frontier operator?
63                help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
64                var_labels: ["source_id"],
65            )),
66            commit_upper_ready_times: registry.register(metric!(
67                name: "mz_source_commit_upper_ready_times",
68                help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
69                var_labels: ["source_id", "worker_id"],
70            )),
71            commit_upper_accepted_times: registry.register(metric!(
72                name: "mz_source_commit_upper_accepted_times",
73                help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
74                var_labels: ["source_id", "worker_id"],
75            )),
76            offset_commit_failures: registry.register(metric!(
77                name: "mz_source_offset_commit_failures",
78                help: "A counter representing how many times we have failed to commit offsets for a source",
79                var_labels: ["source_id"],
80            )),
81            progress: registry.register(metric!(
82                name: "mz_source_progress",
83                help: "A timestamp gauge representing forward progess in the data shard",
84                var_labels: ["source_id", "shard", "worker_id"],
85            )),
86            row_inserts: registry.register(metric!(
87                name: "mz_source_row_inserts",
88                help: "A counter representing the actual number of rows being inserted to the data shard",
89                var_labels: ["source_id", "shard", "worker_id"],
90            )),
91            row_retractions: registry.register(metric!(
92                name: "mz_source_row_retractions",
93                help: "A counter representing the actual number of rows being retracted from the data shard",
94                var_labels: ["source_id", "shard", "worker_id"],
95            )),
96            error_inserts: registry.register(metric!(
97                name: "mz_source_error_inserts",
98                help: "A counter representing the actual number of errors being inserted to the data shard",
99                var_labels: ["source_id", "shard", "worker_id"],
100            )),
101            error_retractions: registry.register(metric!(
102                name: "mz_source_error_retractions",
103                help: "A counter representing the actual number of errors being retracted from the data shard",
104                var_labels: ["source_id", "shard", "worker_id"],
105            )),
106            persist_sink_processed_batches: registry.register(metric!(
107                name: "mz_source_processed_batches",
108                help: "A counter representing the number of persist sink batches with actual data \
109                we have successfully processed.",
110                var_labels: ["source_id", "shard", "worker_id"],
111            )),
112        }
113    }
114}
115
116/// General metrics about sources that are not specific to the source type
117pub(crate) struct SourceMetrics {
118    /// The resume_upper for a source.
119    pub(crate) resume_upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
120    /// The number of ready remap bindings that are held in the reclock commit upper operator.
121    pub(crate) commit_upper_ready_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
122    /// The number of accepted remap bindings that are held in the reclock commit upper operator.
123    pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
124}
125
126impl SourceMetrics {
127    /// Initializes source metrics for a given (source_id, worker_id)
128    pub(crate) fn new(
129        defs: &GeneralSourceMetricDefs,
130        source_id: GlobalId,
131        worker_id: usize,
132    ) -> SourceMetrics {
133        SourceMetrics {
134            resume_upper: defs
135                .resume_upper
136                .get_delete_on_drop_metric(vec![source_id.to_string()]),
137            commit_upper_ready_times: defs
138                .commit_upper_ready_times
139                .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
140            commit_upper_accepted_times: defs
141                .commit_upper_accepted_times
142                .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
143        }
144    }
145}
146
147/// Source-specific metrics used by the `persist_sink`
148pub(crate) struct SourcePersistSinkMetrics {
149    pub(crate) progress: DeleteOnDropGauge<AtomicI64, Vec<String>>,
150    pub(crate) row_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
151    pub(crate) row_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152    pub(crate) error_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153    pub(crate) error_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154    pub(crate) processed_batches: DeleteOnDropCounter<AtomicU64, Vec<String>>,
155}
156
157impl SourcePersistSinkMetrics {
158    /// Initializes source metrics used in the `persist_sink`.
159    pub(crate) fn new(
160        defs: &GeneralSourceMetricDefs,
161        _source_id: GlobalId,
162        parent_source_id: GlobalId,
163        worker_id: usize,
164        shard_id: &mz_persist_client::ShardId,
165    ) -> SourcePersistSinkMetrics {
166        let shard = shard_id.to_string();
167        SourcePersistSinkMetrics {
168            progress: defs.progress.get_delete_on_drop_metric(vec![
169                parent_source_id.to_string(),
170                shard.clone(),
171                worker_id.to_string(),
172            ]),
173            row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
174                parent_source_id.to_string(),
175                shard.clone(),
176                worker_id.to_string(),
177            ]),
178            row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
179                parent_source_id.to_string(),
180                shard.clone(),
181                worker_id.to_string(),
182            ]),
183            error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
184                parent_source_id.to_string(),
185                shard.clone(),
186                worker_id.to_string(),
187            ]),
188            error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
189                parent_source_id.to_string(),
190                shard.clone(),
191                worker_id.to_string(),
192            ]),
193            processed_batches: defs
194                .persist_sink_processed_batches
195                .get_delete_on_drop_metric(vec![
196                    parent_source_id.to_string(),
197                    shard,
198                    worker_id.to_string(),
199                ]),
200        }
201    }
202}
203
204/// Metrics about committing offsets.
205pub(crate) struct OffsetCommitMetrics {
206    /// The offset-domain resume_upper for a source.
207    pub(crate) offset_commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
208}
209
210impl OffsetCommitMetrics {
211    /// Initialises partition metrics for a given (source_id, partition_id)
212    pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
213        OffsetCommitMetrics {
214            offset_commit_failures: defs
215                .offset_commit_failures
216                .get_delete_on_drop_metric(vec![source_id.to_string()]),
217        }
218    }
219}
220
221/// A set of base metrics that hang off a central metrics registry, labeled by the source they
222/// belong to.
223#[derive(Debug, Clone)]
224pub(crate) struct SourceMetricDefs {
225    pub(crate) source_defs: GeneralSourceMetricDefs,
226    pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
227    pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
228    pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
229    pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs,
230    /// A cluster-wide counter shared across all sources.
231    pub(crate) bytes_read: IntCounter,
232}
233
234impl SourceMetricDefs {
235    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
236        Self {
237            source_defs: GeneralSourceMetricDefs::register_with(registry),
238            postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
239            mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
240            kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
241            sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry),
242            bytes_read: registry.register(metric!(
243                name: "mz_bytes_read_total",
244                help: "Count of bytes read from sources",
245            )),
246        }
247    }
248}