mz_storage/metrics/
source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Base" metrics used by all dataflow sources.
11//!
12//! We label metrics by the concrete source that they get emitted from (which makes these metrics
13//! in-eligible for ingestion by third parties), so that means we have to register the metric
14//! vectors to the registry once, and then generate concrete instantiations of them for the
15//! appropriate source.
16
17use mz_ore::metric;
18use mz_ore::metrics::{
19    DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec,
20    MetricsRegistry, UIntGaugeVec,
21};
22use mz_repr::GlobalId;
23use prometheus::core::{AtomicI64, AtomicU64};
24
25pub mod kafka;
26pub mod mysql;
27pub mod postgres;
28
29/// Definitions for general metrics about sources that are not specific to the source type.
30///
31/// Registers metrics for
32/// `SourceMetrics`, `OffsetCommitMetrics`, and `SourcePersistSinkMetrics`.
33#[derive(Clone, Debug)]
34pub(crate) struct GeneralSourceMetricDefs {
35    // Source metrics
36    pub(crate) resume_upper: IntGaugeVec,
37    pub(crate) commit_upper_ready_times: UIntGaugeVec,
38    pub(crate) commit_upper_accepted_times: UIntGaugeVec,
39
40    // OffsetCommitMetrics
41    pub(crate) offset_commit_failures: IntCounterVec,
42
43    // `persist_sink` metrics
44    /// A timestamp gauge representing forward progress
45    /// in the data shard.
46    pub(crate) progress: IntGaugeVec,
47    pub(crate) row_inserts: IntCounterVec,
48    pub(crate) row_retractions: IntCounterVec,
49    pub(crate) error_inserts: IntCounterVec,
50    pub(crate) error_retractions: IntCounterVec,
51    pub(crate) persist_sink_processed_batches: IntCounterVec,
52}
53
54impl GeneralSourceMetricDefs {
55    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
56        Self {
57            // TODO(guswynn): some of these metrics are not clear when subsources are involved, and
58            // should be fixed
59            resume_upper: registry.register(metric!(
60                name: "mz_resume_upper",
61                // TODO(guswynn): should this also track the resumption frontier operator?
62                help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
63                var_labels: ["source_id"],
64            )),
65            commit_upper_ready_times: registry.register(metric!(
66                name: "mz_source_commit_upper_ready_times",
67                help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
68                var_labels: ["source_id", "worker_id"],
69            )),
70            commit_upper_accepted_times: registry.register(metric!(
71                name: "mz_source_commit_upper_accepted_times",
72                help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
73                var_labels: ["source_id", "worker_id"],
74            )),
75            offset_commit_failures: registry.register(metric!(
76                name: "mz_source_offset_commit_failures",
77                help: "A counter representing how many times we have failed to commit offsets for a source",
78                var_labels: ["source_id"],
79            )),
80            progress: registry.register(metric!(
81                name: "mz_source_progress",
82                help: "A timestamp gauge representing forward progess in the data shard",
83                var_labels: ["source_id", "shard", "worker_id"],
84            )),
85            row_inserts: registry.register(metric!(
86                name: "mz_source_row_inserts",
87                help: "A counter representing the actual number of rows being inserted to the data shard",
88                var_labels: ["source_id", "shard", "worker_id"],
89            )),
90            row_retractions: registry.register(metric!(
91                name: "mz_source_row_retractions",
92                help: "A counter representing the actual number of rows being retracted from the data shard",
93                var_labels: ["source_id", "shard", "worker_id"],
94            )),
95            error_inserts: registry.register(metric!(
96                name: "mz_source_error_inserts",
97                help: "A counter representing the actual number of errors being inserted to the data shard",
98                var_labels: ["source_id", "shard", "worker_id"],
99            )),
100            error_retractions: registry.register(metric!(
101                name: "mz_source_error_retractions",
102                help: "A counter representing the actual number of errors being retracted from the data shard",
103                var_labels: ["source_id", "shard", "worker_id"],
104            )),
105            persist_sink_processed_batches: registry.register(metric!(
106                name: "mz_source_processed_batches",
107                help: "A counter representing the number of persist sink batches with actual data \
108                we have successfully processed.",
109                var_labels: ["source_id", "shard", "worker_id"],
110            )),
111        }
112    }
113}
114
115/// General metrics about sources that are not specific to the source type
116pub(crate) struct SourceMetrics {
117    /// The resume_upper for a source.
118    pub(crate) resume_upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
119    /// The number of ready remap bindings that are held in the reclock commit upper operator.
120    pub(crate) commit_upper_ready_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
121    /// The number of accepted remap bindings that are held in the reclock commit upper operator.
122    pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
123}
124
125impl SourceMetrics {
126    /// Initializes source metrics for a given (source_id, worker_id)
127    pub(crate) fn new(
128        defs: &GeneralSourceMetricDefs,
129        source_id: GlobalId,
130        worker_id: usize,
131    ) -> SourceMetrics {
132        SourceMetrics {
133            resume_upper: defs
134                .resume_upper
135                .get_delete_on_drop_metric(vec![source_id.to_string()]),
136            commit_upper_ready_times: defs
137                .commit_upper_ready_times
138                .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
139            commit_upper_accepted_times: defs
140                .commit_upper_accepted_times
141                .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
142        }
143    }
144}
145
146/// Source-specific metrics used by the `persist_sink`
147pub(crate) struct SourcePersistSinkMetrics {
148    pub(crate) progress: DeleteOnDropGauge<AtomicI64, Vec<String>>,
149    pub(crate) row_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
150    pub(crate) row_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
151    pub(crate) error_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152    pub(crate) error_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153    pub(crate) processed_batches: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154}
155
156impl SourcePersistSinkMetrics {
157    /// Initializes source metrics used in the `persist_sink`.
158    pub(crate) fn new(
159        defs: &GeneralSourceMetricDefs,
160        _source_id: GlobalId,
161        parent_source_id: GlobalId,
162        worker_id: usize,
163        shard_id: &mz_persist_client::ShardId,
164    ) -> SourcePersistSinkMetrics {
165        let shard = shard_id.to_string();
166        SourcePersistSinkMetrics {
167            progress: defs.progress.get_delete_on_drop_metric(vec![
168                parent_source_id.to_string(),
169                shard.clone(),
170                worker_id.to_string(),
171            ]),
172            row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
173                parent_source_id.to_string(),
174                shard.clone(),
175                worker_id.to_string(),
176            ]),
177            row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
178                parent_source_id.to_string(),
179                shard.clone(),
180                worker_id.to_string(),
181            ]),
182            error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
183                parent_source_id.to_string(),
184                shard.clone(),
185                worker_id.to_string(),
186            ]),
187            error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
188                parent_source_id.to_string(),
189                shard.clone(),
190                worker_id.to_string(),
191            ]),
192            processed_batches: defs
193                .persist_sink_processed_batches
194                .get_delete_on_drop_metric(vec![
195                    parent_source_id.to_string(),
196                    shard,
197                    worker_id.to_string(),
198                ]),
199        }
200    }
201}
202
203/// Metrics about committing offsets.
204pub(crate) struct OffsetCommitMetrics {
205    /// The offset-domain resume_upper for a source.
206    pub(crate) offset_commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
207}
208
209impl OffsetCommitMetrics {
210    /// Initialises partition metrics for a given (source_id, partition_id)
211    pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
212        OffsetCommitMetrics {
213            offset_commit_failures: defs
214                .offset_commit_failures
215                .get_delete_on_drop_metric(vec![source_id.to_string()]),
216        }
217    }
218}
219
220/// A set of base metrics that hang off a central metrics registry, labeled by the source they
221/// belong to.
222#[derive(Debug, Clone)]
223pub(crate) struct SourceMetricDefs {
224    pub(crate) source_defs: GeneralSourceMetricDefs,
225    pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
226    pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
227    pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
228    /// A cluster-wide counter shared across all sources.
229    pub(crate) bytes_read: IntCounter,
230}
231
232impl SourceMetricDefs {
233    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
234        Self {
235            source_defs: GeneralSourceMetricDefs::register_with(registry),
236            postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
237            mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
238            kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
239            bytes_read: registry.register(metric!(
240                name: "mz_bytes_read_total",
241                help: "Count of bytes read from sources",
242            )),
243        }
244    }
245}