1use mz_ore::metric;
18use mz_ore::metrics::{
19 DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec,
20 MetricsRegistry, UIntGaugeVec,
21};
22use mz_repr::GlobalId;
23use prometheus::core::{AtomicI64, AtomicU64};
24
25pub mod kafka;
26pub mod mysql;
27pub mod postgres;
28
29#[derive(Clone, Debug)]
34pub(crate) struct GeneralSourceMetricDefs {
35 pub(crate) resume_upper: IntGaugeVec,
37 pub(crate) commit_upper_ready_times: UIntGaugeVec,
38 pub(crate) commit_upper_accepted_times: UIntGaugeVec,
39
40 pub(crate) offset_commit_failures: IntCounterVec,
42
43 pub(crate) progress: IntGaugeVec,
47 pub(crate) row_inserts: IntCounterVec,
48 pub(crate) row_retractions: IntCounterVec,
49 pub(crate) error_inserts: IntCounterVec,
50 pub(crate) error_retractions: IntCounterVec,
51 pub(crate) persist_sink_processed_batches: IntCounterVec,
52}
53
54impl GeneralSourceMetricDefs {
55 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
56 Self {
57 resume_upper: registry.register(metric!(
60 name: "mz_resume_upper",
61 help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
63 var_labels: ["source_id"],
64 )),
65 commit_upper_ready_times: registry.register(metric!(
66 name: "mz_source_commit_upper_ready_times",
67 help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
68 var_labels: ["source_id", "worker_id"],
69 )),
70 commit_upper_accepted_times: registry.register(metric!(
71 name: "mz_source_commit_upper_accepted_times",
72 help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
73 var_labels: ["source_id", "worker_id"],
74 )),
75 offset_commit_failures: registry.register(metric!(
76 name: "mz_source_offset_commit_failures",
77 help: "A counter representing how many times we have failed to commit offsets for a source",
78 var_labels: ["source_id"],
79 )),
80 progress: registry.register(metric!(
81 name: "mz_source_progress",
82 help: "A timestamp gauge representing forward progess in the data shard",
83 var_labels: ["source_id", "shard", "worker_id"],
84 )),
85 row_inserts: registry.register(metric!(
86 name: "mz_source_row_inserts",
87 help: "A counter representing the actual number of rows being inserted to the data shard",
88 var_labels: ["source_id", "shard", "worker_id"],
89 )),
90 row_retractions: registry.register(metric!(
91 name: "mz_source_row_retractions",
92 help: "A counter representing the actual number of rows being retracted from the data shard",
93 var_labels: ["source_id", "shard", "worker_id"],
94 )),
95 error_inserts: registry.register(metric!(
96 name: "mz_source_error_inserts",
97 help: "A counter representing the actual number of errors being inserted to the data shard",
98 var_labels: ["source_id", "shard", "worker_id"],
99 )),
100 error_retractions: registry.register(metric!(
101 name: "mz_source_error_retractions",
102 help: "A counter representing the actual number of errors being retracted from the data shard",
103 var_labels: ["source_id", "shard", "worker_id"],
104 )),
105 persist_sink_processed_batches: registry.register(metric!(
106 name: "mz_source_processed_batches",
107 help: "A counter representing the number of persist sink batches with actual data \
108 we have successfully processed.",
109 var_labels: ["source_id", "shard", "worker_id"],
110 )),
111 }
112 }
113}
114
115pub(crate) struct SourceMetrics {
117 pub(crate) resume_upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
119 pub(crate) commit_upper_ready_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
121 pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<AtomicU64, Vec<String>>,
123}
124
125impl SourceMetrics {
126 pub(crate) fn new(
128 defs: &GeneralSourceMetricDefs,
129 source_id: GlobalId,
130 worker_id: usize,
131 ) -> SourceMetrics {
132 SourceMetrics {
133 resume_upper: defs
134 .resume_upper
135 .get_delete_on_drop_metric(vec![source_id.to_string()]),
136 commit_upper_ready_times: defs
137 .commit_upper_ready_times
138 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
139 commit_upper_accepted_times: defs
140 .commit_upper_accepted_times
141 .get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
142 }
143 }
144}
145
146pub(crate) struct SourcePersistSinkMetrics {
148 pub(crate) progress: DeleteOnDropGauge<AtomicI64, Vec<String>>,
149 pub(crate) row_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
150 pub(crate) row_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
151 pub(crate) error_inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152 pub(crate) error_retractions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153 pub(crate) processed_batches: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154}
155
156impl SourcePersistSinkMetrics {
157 pub(crate) fn new(
159 defs: &GeneralSourceMetricDefs,
160 _source_id: GlobalId,
161 parent_source_id: GlobalId,
162 worker_id: usize,
163 shard_id: &mz_persist_client::ShardId,
164 ) -> SourcePersistSinkMetrics {
165 let shard = shard_id.to_string();
166 SourcePersistSinkMetrics {
167 progress: defs.progress.get_delete_on_drop_metric(vec![
168 parent_source_id.to_string(),
169 shard.clone(),
170 worker_id.to_string(),
171 ]),
172 row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
173 parent_source_id.to_string(),
174 shard.clone(),
175 worker_id.to_string(),
176 ]),
177 row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
178 parent_source_id.to_string(),
179 shard.clone(),
180 worker_id.to_string(),
181 ]),
182 error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
183 parent_source_id.to_string(),
184 shard.clone(),
185 worker_id.to_string(),
186 ]),
187 error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
188 parent_source_id.to_string(),
189 shard.clone(),
190 worker_id.to_string(),
191 ]),
192 processed_batches: defs
193 .persist_sink_processed_batches
194 .get_delete_on_drop_metric(vec![
195 parent_source_id.to_string(),
196 shard,
197 worker_id.to_string(),
198 ]),
199 }
200 }
201}
202
203pub(crate) struct OffsetCommitMetrics {
205 pub(crate) offset_commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
207}
208
209impl OffsetCommitMetrics {
210 pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
212 OffsetCommitMetrics {
213 offset_commit_failures: defs
214 .offset_commit_failures
215 .get_delete_on_drop_metric(vec![source_id.to_string()]),
216 }
217 }
218}
219
220#[derive(Debug, Clone)]
223pub(crate) struct SourceMetricDefs {
224 pub(crate) source_defs: GeneralSourceMetricDefs,
225 pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
226 pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
227 pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
228 pub(crate) bytes_read: IntCounter,
230}
231
232impl SourceMetricDefs {
233 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
234 Self {
235 source_defs: GeneralSourceMetricDefs::register_with(registry),
236 postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
237 mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
238 kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
239 bytes_read: registry.register(metric!(
240 name: "mz_bytes_read_total",
241 help: "Count of bytes read from sources",
242 )),
243 }
244 }
245}