1use mz_ore::metric;
18use mz_ore::metrics::{
19 DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec,
20 MetricsRegistry, UIntGaugeVec,
21};
22use mz_repr::GlobalId;
23use prometheus::core::{AtomicI64, AtomicU64};
24
25pub mod kafka;
26pub mod mysql;
27pub mod postgres;
28pub mod sql_server;
29
30#[derive(Clone, Debug)]
35pub(crate) struct GeneralSourceMetricDefs {
36 pub(crate) resume_upper: IntGaugeVec,
38 pub(crate) commit_upper_ready_times: UIntGaugeVec,
39 pub(crate) commit_upper_accepted_times: UIntGaugeVec,
40
41 pub(crate) offset_commit_failures: IntCounterVec,
43
44 pub(crate) progress: IntGaugeVec,
48 pub(crate) row_inserts: IntCounterVec,
49 pub(crate) row_retractions: IntCounterVec,
50 pub(crate) error_inserts: IntCounterVec,
51 pub(crate) error_retractions: IntCounterVec,
52 pub(crate) persist_sink_processed_batches: IntCounterVec,
53}
54
55impl GeneralSourceMetricDefs {
56 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
57 Self {
58 resume_upper: registry.register(metric!(
61 name: "mz_resume_upper",
62 help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
64 var_labels: ["source_id"],
65 )),
66 commit_upper_ready_times: registry.register(metric!(
67 name: "mz_source_commit_upper_ready_times",
68 help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
69 var_labels: ["source_id", "worker_id"],
70 )),
71 commit_upper_accepted_times: registry.register(metric!(
72 name: "mz_source_commit_upper_accepted_times",
73 help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
74 var_labels: ["source_id", "worker_id"],
75 )),
76 offset_commit_failures: registry.register(metric!(
77 name: "mz_source_offset_commit_failures",
78 help: "A counter representing how many times we have failed to commit offsets for a source",
79 var_labels: ["source_id"],
80 )),
81 progress: registry.register(metric!(
82 name: "mz_source_progress",
83 help: "A timestamp gauge representing forward progess in the data shard",
84 var_labels: ["source_id", "shard", "worker_id"],
85 )),
86 row_inserts: registry.register(metric!(
87 name: "mz_source_row_inserts",
88 help: "A counter representing the actual number of rows being inserted to the data shard",
89 var_labels: ["source_id", "shard", "worker_id"],
90 )),
91 row_retractions: registry.register(metric!(
92 name: "mz_source_row_retractions",
93 help: "A counter representing the actual number of rows being retracted from the data shard",
94 var_labels: ["source_id", "shard", "worker_id"],
95 )),
96 error_inserts: registry.register(metric!(
97 name: "mz_source_error_inserts",
98 help: "A counter representing the actual number of errors being inserted to the data shard",
99 var_labels: ["source_id", "shard", "worker_id"],
100 )),
101 error_retractions: registry.register(metric!(
102 name: "mz_source_error_retractions",
103 help: "A counter representing the actual number of errors being retracted from the data shard",
104 var_labels: ["source_id", "shard", "worker_id"],
105 )),
106 persist_sink_processed_batches: registry.register(metric!(
107 name: "mz_source_processed_batches",
108 help: "A counter representing the number of persist sink batches with actual data \
109 we have successfully processed.",
110 var_labels: ["source_id", "shard", "worker_id"],
111 )),
112 }
113 }
114}
115
116pub(crate) struct SourceMetrics {
118 pub(crate) resume_upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
120 pub(crate) commit_upper_ready_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
122 pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
124}
125
126impl SourceMetrics {
127 pub(crate) fn new(
129 defs: &GeneralSourceMetricDefs,
130 source_id: GlobalId,
131 worker_id: usize,
132 ) -> SourceMetrics {
133 SourceMetrics {
134 resume_upper: defs
135 .resume_upper
136 .get_delete_on_drop_metric(vec![source_id.to_string()]),
137 commit_upper_ready_times: defs
138 .commit_upper_ready_times
139 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
140 commit_upper_accepted_times: defs
141 .commit_upper_accepted_times
142 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
143 }
144 }
145}
146
147pub(crate) struct SourcePersistSinkMetrics {
149 pub(crate) progress: DeleteOnDropGauge<AtomicI64, Vec<String>>,
150 pub(crate) row_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
151 pub(crate) row_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152 pub(crate) error_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153 pub(crate) error_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154 pub(crate) processed_batches: DeleteOnDropCounter<AtomicU64, Vec<String>>,
155}
156
157impl SourcePersistSinkMetrics {
158 pub(crate) fn new(
160 defs: &GeneralSourceMetricDefs,
161 _source_id: GlobalId,
162 parent_source_id: GlobalId,
163 worker_id: usize,
164 shard_id: &mz_persist_client::ShardId,
165 ) -> SourcePersistSinkMetrics {
166 let shard = shard_id.to_string();
167 SourcePersistSinkMetrics {
168 progress: defs.progress.get_delete_on_drop_metric(vec![
169 parent_source_id.to_string(),
170 shard.clone(),
171 worker_id.to_string(),
172 ]),
173 row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
174 parent_source_id.to_string(),
175 shard.clone(),
176 worker_id.to_string(),
177 ]),
178 row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
179 parent_source_id.to_string(),
180 shard.clone(),
181 worker_id.to_string(),
182 ]),
183 error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
184 parent_source_id.to_string(),
185 shard.clone(),
186 worker_id.to_string(),
187 ]),
188 error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
189 parent_source_id.to_string(),
190 shard.clone(),
191 worker_id.to_string(),
192 ]),
193 processed_batches: defs
194 .persist_sink_processed_batches
195 .get_delete_on_drop_metric(vec![
196 parent_source_id.to_string(),
197 shard,
198 worker_id.to_string(),
199 ]),
200 }
201 }
202}
203
204pub(crate) struct OffsetCommitMetrics {
206 pub(crate) offset_commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
208}
209
210impl OffsetCommitMetrics {
211 pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
213 OffsetCommitMetrics {
214 offset_commit_failures: defs
215 .offset_commit_failures
216 .get_delete_on_drop_metric(vec![source_id.to_string()]),
217 }
218 }
219}
220
221#[derive(Debug, Clone)]
224pub(crate) struct SourceMetricDefs {
225 pub(crate) source_defs: GeneralSourceMetricDefs,
226 pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
227 pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
228 pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
229 pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs,
230 pub(crate) bytes_read: IntCounter,
232}
233
234impl SourceMetricDefs {
235 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
236 Self {
237 source_defs: GeneralSourceMetricDefs::register_with(registry),
238 postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
239 mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
240 kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
241 sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry),
242 bytes_read: registry.register(metric!(
243 name: "mz_bytes_read_total",
244 help: "Count of bytes read from sources",
245 )),
246 }
247 }
248}