Skip to main content

mz_storage/metrics/
source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Base" metrics used by all dataflow sources.
11//!
12//! We label metrics by the concrete source that they get emitted from (which makes these metrics
13//! in-eligible for ingestion by third parties), so that means we have to register the metric
14//! vectors to the registry once, and then generate concrete instantiations of them for the
15//! appropriate source.
16
17use mz_ore::metric;
18use mz_ore::metrics::{
19    DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec,
20    MetricVisibility, MetricsRegistry, UIntGaugeVec,
21};
22use mz_repr::GlobalId;
23use prometheus::core::{AtomicI64, AtomicU64};
24
25pub mod kafka;
26pub mod mysql;
27pub mod postgres;
28pub mod sql_server;
29
30/// Definitions for general metrics about sources that are not specific to the source type.
31///
32/// Registers metrics for
33/// `SourceMetrics`, `OffsetCommitMetrics`, and `SourcePersistSinkMetrics`.
34#[derive(Clone, Debug)]
35pub(crate) struct GeneralSourceMetricDefs {
36    // Source metrics
37    pub(crate) resume_upper: IntGaugeVec,
38    pub(crate) commit_upper_ready_times: UIntGaugeVec,
39    pub(crate) commit_upper_accepted_times: UIntGaugeVec,
40
41    // OffsetCommitMetrics
42    pub(crate) offset_commit_failures: IntCounterVec,
43
44    // `persist_sink` metrics
45    /// A timestamp gauge representing forward progress
46    /// in the data shard.
47    pub(crate) progress: IntGaugeVec,
48    pub(crate) row_inserts: IntCounterVec,
49    pub(crate) row_retractions: IntCounterVec,
50    pub(crate) error_inserts: IntCounterVec,
51    pub(crate) error_retractions: IntCounterVec,
52    pub(crate) persist_sink_processed_batches: IntCounterVec,
53}
54
55impl GeneralSourceMetricDefs {
56    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
57        Self {
58            // TODO(guswynn): some of these metrics are not clear when subsources are involved, and
59            // should be fixed
60            resume_upper: registry.register(metric!(
61                name: "mz_resume_upper",
62                // TODO(guswynn): should this also track the resumption frontier operator?
63                help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
64                var_labels: ["source_id"],
65            )),
66            commit_upper_ready_times: registry.register(metric!(
67                name: "mz_source_commit_upper_ready_times",
68                help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
69                var_labels: ["source_id", "worker_id"],
70            )),
71            commit_upper_accepted_times: registry.register(metric!(
72                name: "mz_source_commit_upper_accepted_times",
73                help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
74                var_labels: ["source_id", "worker_id"],
75            )),
76            offset_commit_failures: registry.register(metric!(
77                name: "mz_source_offset_commit_failures",
78                help: "A counter representing how many times we have failed to commit offsets for a source",
79                var_labels: ["source_id"],
80                visibility: MetricVisibility::Public,
81            )),
82            progress: registry.register(metric!(
83                name: "mz_source_progress",
84                help: "A timestamp gauge representing forward progess in the data shard",
85                var_labels: ["source_id", "shard", "worker_id"],
86            )),
87            row_inserts: registry.register(metric!(
88                name: "mz_source_row_inserts",
89                help: "A counter representing the actual number of rows being inserted to the data shard",
90                var_labels: ["source_id", "shard", "worker_id"],
91            )),
92            row_retractions: registry.register(metric!(
93                name: "mz_source_row_retractions",
94                help: "A counter representing the actual number of rows being retracted from the data shard",
95                var_labels: ["source_id", "shard", "worker_id"],
96            )),
97            error_inserts: registry.register(metric!(
98                name: "mz_source_error_inserts",
99                help: "A counter representing the actual number of errors being inserted to the data shard",
100                var_labels: ["source_id", "shard", "worker_id"],
101            )),
102            error_retractions: registry.register(metric!(
103                name: "mz_source_error_retractions",
104                help: "A counter representing the actual number of errors being retracted from the data shard",
105                var_labels: ["source_id", "shard", "worker_id"],
106            )),
107            persist_sink_processed_batches: registry.register(metric!(
108                name: "mz_source_processed_batches",
109                help: "A counter representing the number of persist sink batches with actual data \
110                we have successfully processed.",
111                var_labels: ["source_id", "shard", "worker_id"],
112            )),
113        }
114    }
115}
116
117/// General metrics about sources that are not specific to the source type
118pub(crate) struct SourceMetrics {
119    /// The resume_upper for a source.
120    pub(crate) resume_upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
121    /// The number of ready remap bindings that are held in the reclock commit upper operator.
122    pub(crate) commit_upper_ready_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
123    /// The number of accepted remap bindings that are held in the reclock commit upper operator.
124    pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
125}
126
127impl SourceMetrics {
128    /// Initializes source metrics for a given (source_id, worker_id)
129    pub(crate) fn new(
130        defs: &GeneralSourceMetricDefs,
131        source_id: GlobalId,
132        worker_id: usize,
133    ) -> SourceMetrics {
134        SourceMetrics {
135            resume_upper: defs
136                .resume_upper
137                .get_delete_on_drop_metric(vec![source_id.to_string()]),
138            commit_upper_ready_times: defs
139                .commit_upper_ready_times
140                .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
141            commit_upper_accepted_times: defs
142                .commit_upper_accepted_times
143                .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
144        }
145    }
146}
147
148/// Source-specific metrics used by the `persist_sink`
149pub(crate) struct SourcePersistSinkMetrics {
150    pub(crate) progress: DeleteOnDropGauge<AtomicI64, Vec<String>>,
151    pub(crate) row_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152    pub(crate) row_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153    pub(crate) error_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154    pub(crate) error_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
155    pub(crate) processed_batches: DeleteOnDropCounter<AtomicU64, Vec<String>>,
156}
157
158impl SourcePersistSinkMetrics {
159    /// Initializes source metrics used in the `persist_sink`.
160    pub(crate) fn new(
161        defs: &GeneralSourceMetricDefs,
162        _source_id: GlobalId,
163        parent_source_id: GlobalId,
164        worker_id: usize,
165        shard_id: &mz_persist_client::ShardId,
166    ) -> SourcePersistSinkMetrics {
167        let shard = shard_id.to_string();
168        SourcePersistSinkMetrics {
169            progress: defs.progress.get_delete_on_drop_metric(vec![
170                parent_source_id.to_string(),
171                shard.clone(),
172                worker_id.to_string(),
173            ]),
174            row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
175                parent_source_id.to_string(),
176                shard.clone(),
177                worker_id.to_string(),
178            ]),
179            row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
180                parent_source_id.to_string(),
181                shard.clone(),
182                worker_id.to_string(),
183            ]),
184            error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
185                parent_source_id.to_string(),
186                shard.clone(),
187                worker_id.to_string(),
188            ]),
189            error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
190                parent_source_id.to_string(),
191                shard.clone(),
192                worker_id.to_string(),
193            ]),
194            processed_batches: defs
195                .persist_sink_processed_batches
196                .get_delete_on_drop_metric(vec![
197                    parent_source_id.to_string(),
198                    shard,
199                    worker_id.to_string(),
200                ]),
201        }
202    }
203}
204
205/// Metrics about committing offsets.
206pub(crate) struct OffsetCommitMetrics {
207    /// The offset-domain resume_upper for a source.
208    pub(crate) offset_commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
209}
210
211impl OffsetCommitMetrics {
212    /// Initialises partition metrics for a given (source_id, partition_id)
213    pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
214        OffsetCommitMetrics {
215            offset_commit_failures: defs
216                .offset_commit_failures
217                .get_delete_on_drop_metric(vec![source_id.to_string()]),
218        }
219    }
220}
221
222/// A set of base metrics that hang off a central metrics registry, labeled by the source they
223/// belong to.
224#[derive(Debug, Clone)]
225pub(crate) struct SourceMetricDefs {
226    pub(crate) source_defs: GeneralSourceMetricDefs,
227    pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
228    pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
229    pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
230    pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs,
231    /// A cluster-wide counter shared across all sources.
232    pub(crate) bytes_read: IntCounter,
233}
234
235impl SourceMetricDefs {
236    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
237        Self {
238            source_defs: GeneralSourceMetricDefs::register_with(registry),
239            postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
240            mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
241            kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
242            sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry),
243            bytes_read: registry.register(metric!(
244                name: "mz_bytes_read_total",
245                help: "Count of bytes read from sources",
246            )),
247        }
248    }
249}