1use mz_ore::metric;
13use mz_ore::metrics::{DeleteOnDropGauge, IntGaugeVec, MetricsRegistry, UIntGaugeVec};
14use mz_repr::GlobalId;
15use prometheus::core::{AtomicI64, AtomicU64};
16
17#[derive(Clone, Debug)]
19pub(crate) struct KafkaSinkMetricDefs {
20 pub rdkafka_msg_cnt: UIntGaugeVec,
22 pub rdkafka_msg_size: UIntGaugeVec,
24 pub rdkafka_txmsgs: IntGaugeVec,
26 pub rdkafka_txmsg_bytes: IntGaugeVec,
28 pub rdkafka_tx: IntGaugeVec,
30 pub rdkafka_tx_bytes: IntGaugeVec,
32 pub rdkafka_outbuf_cnt: IntGaugeVec,
34 pub rdkafka_outbuf_msg_cnt: IntGaugeVec,
36 pub rdkafka_waitresp_cnt: IntGaugeVec,
38 pub rdkafka_waitresp_msg_cnt: IntGaugeVec,
40 pub rdkafka_txerrs: UIntGaugeVec,
42 pub rdkafka_txretries: UIntGaugeVec,
44 pub rdkafka_req_timeouts: UIntGaugeVec,
46 pub rdkafka_connects: IntGaugeVec,
49 pub rdkafka_disconnects: IntGaugeVec,
52 pub outstanding_progress_records: UIntGaugeVec,
54 pub consumed_progress_records: UIntGaugeVec,
56 pub partition_count: UIntGaugeVec,
58}
59
60impl KafkaSinkMetricDefs {
61 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
62 Self {
63 rdkafka_msg_cnt: registry.register(metric!(
64 name: "mz_sink_rdkafka_msg_cnt",
65 help: "The current number of messages in producer queues.",
66 var_labels: ["sink_id"],
67 )),
68 rdkafka_msg_size: registry.register(metric!(
69 name: "mz_sink_rdkafka_msg_size",
70 help: "The current total size of messages in producer queues.",
71 var_labels: ["sink_id"],
72 )),
73 rdkafka_txmsgs: registry.register(metric!(
74 name: "mz_sink_rdkafka_txmsgs",
75 help: "The total number of messages transmitted (produced) to brokers.",
76 var_labels: ["sink_id"],
77 )),
78 rdkafka_txmsg_bytes: registry.register(metric!(
79 name: "mz_sink_rdkafka_txmsg_bytes",
80 help: "The total number of bytes transmitted (produced) to brokers.",
81 var_labels: ["sink_id"],
82 )),
83 rdkafka_tx: registry.register(metric!(
84 name: "mz_sink_rdkafka_tx",
85 help: "The total number of requests sent to brokers.",
86 var_labels: ["sink_id"],
87 )),
88 rdkafka_tx_bytes: registry.register(metric!(
89 name: "mz_sink_rdkafka_tx_bytes",
90 help: "The total number of bytes transmitted to brokers.",
91 var_labels: ["sink_id"],
92 )),
93 rdkafka_outbuf_cnt: registry.register(metric!(
94 name: "mz_sink_rdkafka_outbuf_cnt",
95 help: "The number of requests awaiting transmission across all brokers.",
96 var_labels: ["sink_id"],
97 )),
98 rdkafka_outbuf_msg_cnt: registry.register(metric!(
99 name: "mz_sink_rdkafka_outbuf_msg_cnt",
100 help: "The number of messages awaiting transmission across all brokers.",
101 var_labels: ["sink_id"],
102 )),
103 rdkafka_waitresp_cnt: registry.register(metric!(
104 name: "mz_sink_rdkafka_waitresp_cnt",
105 help: "The number of requests in-flight across all brokers that are awaiting a \
106 response.",
107 var_labels: ["sink_id"],
108 )),
109 rdkafka_waitresp_msg_cnt: registry.register(metric!(
110 name: "mz_sink_rdkafka_waitresp_msg_cnt",
111 help: "The number of messages in-flight across all brokers that are awaiting a \
112 response.",
113 var_labels: ["sink_id"],
114 )),
115 rdkafka_txerrs: registry.register(metric!(
116 name: "mz_sink_rdkafka_txerrs",
117 help: "The total number of transmission errors across all brokers.",
118 var_labels: ["sink_id"],
119 )),
120 rdkafka_txretries: registry.register(metric!(
121 name: "mz_sink_rdkafka_txretries",
122 help: "The total number of request retries across all brokers.",
123 var_labels: ["sink_id"],
124 )),
125 rdkafka_req_timeouts: registry.register(metric!(
126 name: "mz_sink_rdkafka_req_timeouts",
127 help: "The total number of requests that timed out across all brokers.",
128 var_labels: ["sink_id"],
129 )),
130 rdkafka_connects: registry.register(metric!(
131 name: "mz_sink_rdkafka_connects",
132 help: "The number of connection attempts, including successful and failed \
133 attempts, and name resolution failures across all brokers.",
134 var_labels: ["sink_id"],
135 )),
136 rdkafka_disconnects: registry.register(metric!(
137 name: "mz_sink_rdkafka_disconnects",
138 help: "The number of disconnections, whether triggered by the broker, the \
139 network, the load balancer, or something else across all brokers.",
140 var_labels: ["sink_id"],
141 )),
142 outstanding_progress_records: registry.register(metric!(
143 name: "mz_sink_oustanding_progress_records",
144 help: "The number of outstanding progress records that need to be read before the sink can resume.",
145 var_labels: ["sink_id"],
146 )),
147 consumed_progress_records: registry.register(metric!(
148 name: "mz_sink_consumed_progress_records",
149 help: "The number of progress records consumed by the sink.",
150 var_labels: ["sink_id"],
151 )),
152 partition_count: registry.register(metric!(
153 name: "mz_sink_partition_count",
154 help: "The number of partitions this sink is publishing to.",
155 var_labels: ["sink_id"],
156 )),
157 }
158 }
159}
160
161pub(crate) struct KafkaSinkMetrics {
163 pub rdkafka_msg_cnt: DeleteOnDropGauge<AtomicU64, Vec<String>>,
165 pub rdkafka_msg_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
167 pub rdkafka_txmsgs: DeleteOnDropGauge<AtomicI64, Vec<String>>,
169 pub rdkafka_txmsg_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
171 pub rdkafka_tx: DeleteOnDropGauge<AtomicI64, Vec<String>>,
173 pub rdkafka_tx_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
175 pub rdkafka_outbuf_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
177 pub rdkafka_outbuf_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
179 pub rdkafka_waitresp_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
181 pub rdkafka_waitresp_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
183 pub rdkafka_txerrs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
185 pub rdkafka_txretries: DeleteOnDropGauge<AtomicU64, Vec<String>>,
187 pub rdkafka_req_timeouts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
189 pub rdkafka_connects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
192 pub rdkafka_disconnects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
195 pub outstanding_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
197 pub consumed_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
199 pub partition_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
201}
202
203impl KafkaSinkMetrics {
204 pub fn new(defs: &KafkaSinkMetricDefs, sink_id: GlobalId) -> Self {
206 let labels = &[sink_id.to_string()];
207 Self {
208 rdkafka_msg_cnt: defs
209 .rdkafka_msg_cnt
210 .get_delete_on_drop_metric(labels.to_vec()),
211 rdkafka_msg_size: defs
212 .rdkafka_msg_size
213 .get_delete_on_drop_metric(labels.to_vec()),
214 rdkafka_txmsgs: defs
215 .rdkafka_txmsgs
216 .get_delete_on_drop_metric(labels.to_vec()),
217 rdkafka_txmsg_bytes: defs
218 .rdkafka_txmsg_bytes
219 .get_delete_on_drop_metric(labels.to_vec()),
220 rdkafka_tx: defs.rdkafka_tx.get_delete_on_drop_metric(labels.to_vec()),
221 rdkafka_tx_bytes: defs
222 .rdkafka_tx_bytes
223 .get_delete_on_drop_metric(labels.to_vec()),
224 rdkafka_outbuf_cnt: defs
225 .rdkafka_outbuf_cnt
226 .get_delete_on_drop_metric(labels.to_vec()),
227 rdkafka_outbuf_msg_cnt: defs
228 .rdkafka_outbuf_msg_cnt
229 .get_delete_on_drop_metric(labels.to_vec()),
230 rdkafka_waitresp_cnt: defs
231 .rdkafka_waitresp_cnt
232 .get_delete_on_drop_metric(labels.to_vec()),
233 rdkafka_waitresp_msg_cnt: defs
234 .rdkafka_waitresp_msg_cnt
235 .get_delete_on_drop_metric(labels.to_vec()),
236 rdkafka_txerrs: defs
237 .rdkafka_txerrs
238 .get_delete_on_drop_metric(labels.to_vec()),
239 rdkafka_txretries: defs
240 .rdkafka_txretries
241 .get_delete_on_drop_metric(labels.to_vec()),
242 rdkafka_req_timeouts: defs
243 .rdkafka_req_timeouts
244 .get_delete_on_drop_metric(labels.to_vec()),
245 rdkafka_connects: defs
246 .rdkafka_connects
247 .get_delete_on_drop_metric(labels.to_vec()),
248 rdkafka_disconnects: defs
249 .rdkafka_disconnects
250 .get_delete_on_drop_metric(labels.to_vec()),
251 outstanding_progress_records: defs
252 .outstanding_progress_records
253 .get_delete_on_drop_metric(labels.to_vec()),
254 consumed_progress_records: defs
255 .consumed_progress_records
256 .get_delete_on_drop_metric(labels.to_vec()),
257 partition_count: defs
258 .partition_count
259 .get_delete_on_drop_metric(labels.to_vec()),
260 }
261 }
262}