1use mz_ore::metric;
18use mz_ore::metrics::{
19 DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec,
20 MetricVisibility, MetricsRegistry, UIntGaugeVec,
21};
22use mz_repr::GlobalId;
23use prometheus::core::{AtomicI64, AtomicU64};
24
25pub mod kafka;
26pub mod mysql;
27pub mod postgres;
28pub mod sql_server;
29
30#[derive(Clone, Debug)]
35pub(crate) struct GeneralSourceMetricDefs {
36 pub(crate) resume_upper: IntGaugeVec,
38 pub(crate) commit_upper_ready_times: UIntGaugeVec,
39 pub(crate) commit_upper_accepted_times: UIntGaugeVec,
40
41 pub(crate) offset_commit_failures: IntCounterVec,
43
44 pub(crate) progress: IntGaugeVec,
48 pub(crate) row_inserts: IntCounterVec,
49 pub(crate) row_retractions: IntCounterVec,
50 pub(crate) error_inserts: IntCounterVec,
51 pub(crate) error_retractions: IntCounterVec,
52 pub(crate) persist_sink_processed_batches: IntCounterVec,
53}
54
55impl GeneralSourceMetricDefs {
56 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
57 Self {
58 resume_upper: registry.register(metric!(
61 name: "mz_resume_upper",
62 help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
64 var_labels: ["source_id"],
65 )),
66 commit_upper_ready_times: registry.register(metric!(
67 name: "mz_source_commit_upper_ready_times",
68 help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
69 var_labels: ["source_id", "worker_id"],
70 )),
71 commit_upper_accepted_times: registry.register(metric!(
72 name: "mz_source_commit_upper_accepted_times",
73 help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
74 var_labels: ["source_id", "worker_id"],
75 )),
76 offset_commit_failures: registry.register(metric!(
77 name: "mz_source_offset_commit_failures",
78 help: "A counter representing how many times we have failed to commit offsets for a source",
79 var_labels: ["source_id"],
80 visibility: MetricVisibility::Public,
81 )),
82 progress: registry.register(metric!(
83 name: "mz_source_progress",
84 help: "A timestamp gauge representing forward progess in the data shard",
85 var_labels: ["source_id", "shard", "worker_id"],
86 )),
87 row_inserts: registry.register(metric!(
88 name: "mz_source_row_inserts",
89 help: "A counter representing the actual number of rows being inserted to the data shard",
90 var_labels: ["source_id", "shard", "worker_id"],
91 )),
92 row_retractions: registry.register(metric!(
93 name: "mz_source_row_retractions",
94 help: "A counter representing the actual number of rows being retracted from the data shard",
95 var_labels: ["source_id", "shard", "worker_id"],
96 )),
97 error_inserts: registry.register(metric!(
98 name: "mz_source_error_inserts",
99 help: "A counter representing the actual number of errors being inserted to the data shard",
100 var_labels: ["source_id", "shard", "worker_id"],
101 )),
102 error_retractions: registry.register(metric!(
103 name: "mz_source_error_retractions",
104 help: "A counter representing the actual number of errors being retracted from the data shard",
105 var_labels: ["source_id", "shard", "worker_id"],
106 )),
107 persist_sink_processed_batches: registry.register(metric!(
108 name: "mz_source_processed_batches",
109 help: "A counter representing the number of persist sink batches with actual data \
110 we have successfully processed.",
111 var_labels: ["source_id", "shard", "worker_id"],
112 )),
113 }
114 }
115}
116
117pub(crate) struct SourceMetrics {
119 pub(crate) resume_upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
121 pub(crate) commit_upper_ready_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
123 pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
125}
126
127impl SourceMetrics {
128 pub(crate) fn new(
130 defs: &GeneralSourceMetricDefs,
131 source_id: GlobalId,
132 worker_id: usize,
133 ) -> SourceMetrics {
134 SourceMetrics {
135 resume_upper: defs
136 .resume_upper
137 .get_delete_on_drop_metric(vec![source_id.to_string()]),
138 commit_upper_ready_times: defs
139 .commit_upper_ready_times
140 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
141 commit_upper_accepted_times: defs
142 .commit_upper_accepted_times
143 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
144 }
145 }
146}
147
148pub(crate) struct SourcePersistSinkMetrics {
150 pub(crate) progress: DeleteOnDropGauge<AtomicI64, Vec<String>>,
151 pub(crate) row_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152 pub(crate) row_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153 pub(crate) error_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154 pub(crate) error_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
155 pub(crate) processed_batches: DeleteOnDropCounter<AtomicU64, Vec<String>>,
156}
157
158impl SourcePersistSinkMetrics {
159 pub(crate) fn new(
161 defs: &GeneralSourceMetricDefs,
162 _source_id: GlobalId,
163 parent_source_id: GlobalId,
164 worker_id: usize,
165 shard_id: &mz_persist_client::ShardId,
166 ) -> SourcePersistSinkMetrics {
167 let shard = shard_id.to_string();
168 SourcePersistSinkMetrics {
169 progress: defs.progress.get_delete_on_drop_metric(vec![
170 parent_source_id.to_string(),
171 shard.clone(),
172 worker_id.to_string(),
173 ]),
174 row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
175 parent_source_id.to_string(),
176 shard.clone(),
177 worker_id.to_string(),
178 ]),
179 row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
180 parent_source_id.to_string(),
181 shard.clone(),
182 worker_id.to_string(),
183 ]),
184 error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
185 parent_source_id.to_string(),
186 shard.clone(),
187 worker_id.to_string(),
188 ]),
189 error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
190 parent_source_id.to_string(),
191 shard.clone(),
192 worker_id.to_string(),
193 ]),
194 processed_batches: defs
195 .persist_sink_processed_batches
196 .get_delete_on_drop_metric(vec![
197 parent_source_id.to_string(),
198 shard,
199 worker_id.to_string(),
200 ]),
201 }
202 }
203}
204
205pub(crate) struct OffsetCommitMetrics {
207 pub(crate) offset_commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
209}
210
211impl OffsetCommitMetrics {
212 pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
214 OffsetCommitMetrics {
215 offset_commit_failures: defs
216 .offset_commit_failures
217 .get_delete_on_drop_metric(vec![source_id.to_string()]),
218 }
219 }
220}
221
222#[derive(Debug, Clone)]
225pub(crate) struct SourceMetricDefs {
226 pub(crate) source_defs: GeneralSourceMetricDefs,
227 pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
228 pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
229 pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
230 pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs,
231 pub(crate) bytes_read: IntCounter,
233}
234
235impl SourceMetricDefs {
236 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
237 Self {
238 source_defs: GeneralSourceMetricDefs::register_with(registry),
239 postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
240 mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
241 kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
242 sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry),
243 bytes_read: registry.register(metric!(
244 name: "mz_bytes_read_total",
245 help: "Count of bytes read from sources",
246 )),
247 }
248 }
249}