1use mz_ore::metric;
18use mz_ore::metrics::{
19 DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec, MetricTag,
20 MetricVisibility, MetricsRegistry, UIntGaugeVec,
21};
22use mz_repr::GlobalId;
23use prometheus::core::{AtomicI64, AtomicU64};
24
25pub mod kafka;
26pub mod mysql;
27pub mod postgres;
28pub mod sql_server;
29
30#[derive(Clone, Debug)]
35pub(crate) struct GeneralSourceMetricDefs {
36 pub(crate) resume_upper: IntGaugeVec,
38 pub(crate) commit_upper_ready_times: UIntGaugeVec,
39 pub(crate) commit_upper_accepted_times: UIntGaugeVec,
40
41 pub(crate) offset_commit_failures: IntCounterVec,
43
44 pub(crate) progress: IntGaugeVec,
48 pub(crate) row_inserts: IntCounterVec,
49 pub(crate) row_retractions: IntCounterVec,
50 pub(crate) error_inserts: IntCounterVec,
51 pub(crate) error_retractions: IntCounterVec,
52 pub(crate) persist_sink_processed_batches: IntCounterVec,
53}
54
55impl GeneralSourceMetricDefs {
56 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
57 Self {
58 resume_upper: registry.register(metric!(
61 name: "mz_resume_upper",
62 help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
64 var_labels: ["source_id"],
65 )),
66 commit_upper_ready_times: registry.register(metric!(
67 name: "mz_source_commit_upper_ready_times",
68 help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
69 var_labels: ["source_id", "worker_id"],
70 )),
71 commit_upper_accepted_times: registry.register(metric!(
72 name: "mz_source_commit_upper_accepted_times",
73 help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
74 var_labels: ["source_id", "worker_id"],
75 )),
76 offset_commit_failures: registry.register(metric!(
77 name: "mz_source_offset_commit_failures",
78 help: "A counter representing how many times we have failed to commit offsets for a source",
79 var_labels: ["source_id"],
80 visibility: MetricVisibility::Public,
81 tags: [MetricTag::Source],
82 )),
83 progress: registry.register(metric!(
84 name: "mz_source_progress",
85 help: "A timestamp gauge representing forward progess in the data shard",
86 var_labels: ["source_id", "shard", "worker_id"],
87 )),
88 row_inserts: registry.register(metric!(
89 name: "mz_source_row_inserts",
90 help: "A counter representing the actual number of rows being inserted to the data shard",
91 var_labels: ["source_id", "shard", "worker_id"],
92 )),
93 row_retractions: registry.register(metric!(
94 name: "mz_source_row_retractions",
95 help: "A counter representing the actual number of rows being retracted from the data shard",
96 var_labels: ["source_id", "shard", "worker_id"],
97 )),
98 error_inserts: registry.register(metric!(
99 name: "mz_source_error_inserts",
100 help: "A counter representing the actual number of errors being inserted to the data shard",
101 var_labels: ["source_id", "shard", "worker_id"],
102 )),
103 error_retractions: registry.register(metric!(
104 name: "mz_source_error_retractions",
105 help: "A counter representing the actual number of errors being retracted from the data shard",
106 var_labels: ["source_id", "shard", "worker_id"],
107 )),
108 persist_sink_processed_batches: registry.register(metric!(
109 name: "mz_source_processed_batches",
110 help: "A counter representing the number of persist sink batches with actual data \
111 we have successfully processed.",
112 var_labels: ["source_id", "shard", "worker_id"],
113 )),
114 }
115 }
116}
117
118pub(crate) struct SourceMetrics {
120 pub(crate) resume_upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
122 pub(crate) commit_upper_ready_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
124 pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
126}
127
128impl SourceMetrics {
129 pub(crate) fn new(
131 defs: &GeneralSourceMetricDefs,
132 source_id: GlobalId,
133 worker_id: usize,
134 ) -> SourceMetrics {
135 SourceMetrics {
136 resume_upper: defs
137 .resume_upper
138 .get_delete_on_drop_metric(vec![source_id.to_string()]),
139 commit_upper_ready_times: defs
140 .commit_upper_ready_times
141 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
142 commit_upper_accepted_times: defs
143 .commit_upper_accepted_times
144 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
145 }
146 }
147}
148
149pub(crate) struct SourcePersistSinkMetrics {
151 pub(crate) progress: DeleteOnDropGauge<AtomicI64, Vec<String>>,
152 pub(crate) row_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153 pub(crate) row_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154 pub(crate) error_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
155 pub(crate) error_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
156 pub(crate) processed_batches: DeleteOnDropCounter<AtomicU64, Vec<String>>,
157}
158
159impl SourcePersistSinkMetrics {
160 pub(crate) fn new(
162 defs: &GeneralSourceMetricDefs,
163 _source_id: GlobalId,
164 parent_source_id: GlobalId,
165 worker_id: usize,
166 shard_id: &mz_persist_client::ShardId,
167 ) -> SourcePersistSinkMetrics {
168 let shard = shard_id.to_string();
169 SourcePersistSinkMetrics {
170 progress: defs.progress.get_delete_on_drop_metric(vec![
171 parent_source_id.to_string(),
172 shard.clone(),
173 worker_id.to_string(),
174 ]),
175 row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
176 parent_source_id.to_string(),
177 shard.clone(),
178 worker_id.to_string(),
179 ]),
180 row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
181 parent_source_id.to_string(),
182 shard.clone(),
183 worker_id.to_string(),
184 ]),
185 error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
186 parent_source_id.to_string(),
187 shard.clone(),
188 worker_id.to_string(),
189 ]),
190 error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
191 parent_source_id.to_string(),
192 shard.clone(),
193 worker_id.to_string(),
194 ]),
195 processed_batches: defs
196 .persist_sink_processed_batches
197 .get_delete_on_drop_metric(vec![
198 parent_source_id.to_string(),
199 shard,
200 worker_id.to_string(),
201 ]),
202 }
203 }
204}
205
206pub(crate) struct OffsetCommitMetrics {
208 pub(crate) offset_commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
210}
211
212impl OffsetCommitMetrics {
213 pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
215 OffsetCommitMetrics {
216 offset_commit_failures: defs
217 .offset_commit_failures
218 .get_delete_on_drop_metric(vec![source_id.to_string()]),
219 }
220 }
221}
222
223#[derive(Debug, Clone)]
226pub(crate) struct SourceMetricDefs {
227 pub(crate) source_defs: GeneralSourceMetricDefs,
228 pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
229 pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
230 pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
231 pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs,
232 pub(crate) bytes_read: IntCounter,
234}
235
236impl SourceMetricDefs {
237 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
238 Self {
239 source_defs: GeneralSourceMetricDefs::register_with(registry),
240 postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
241 mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
242 kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
243 sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry),
244 bytes_read: registry.register(metric!(
245 name: "mz_bytes_read_total",
246 help: "Count of bytes read from sources",
247 )),
248 }
249 }
250}