1use mz_ore::metric;
13use mz_ore::metrics::{
14 DeleteOnDropGauge, IntGaugeVec, MetricVisibility, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicI64, AtomicU64};
18
19#[derive(Clone, Debug)]
21pub(crate) struct KafkaSinkMetricDefs {
22 pub rdkafka_msg_cnt: UIntGaugeVec,
24 pub rdkafka_msg_size: UIntGaugeVec,
26 pub rdkafka_txmsgs: IntGaugeVec,
28 pub rdkafka_txmsg_bytes: IntGaugeVec,
30 pub rdkafka_tx: IntGaugeVec,
32 pub rdkafka_tx_bytes: IntGaugeVec,
34 pub rdkafka_outbuf_cnt: IntGaugeVec,
36 pub rdkafka_outbuf_msg_cnt: IntGaugeVec,
38 pub rdkafka_waitresp_cnt: IntGaugeVec,
40 pub rdkafka_waitresp_msg_cnt: IntGaugeVec,
42 pub rdkafka_txerrs: UIntGaugeVec,
44 pub rdkafka_txretries: UIntGaugeVec,
46 pub rdkafka_req_timeouts: UIntGaugeVec,
48 pub rdkafka_connects: IntGaugeVec,
51 pub rdkafka_disconnects: IntGaugeVec,
54 pub outstanding_progress_records: UIntGaugeVec,
56 pub consumed_progress_records: UIntGaugeVec,
58 pub partition_count: UIntGaugeVec,
60}
61
62impl KafkaSinkMetricDefs {
63 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
64 Self {
65 rdkafka_msg_cnt: registry.register(metric!(
66 name: "mz_sink_rdkafka_msg_cnt",
67 help: "The current number of messages in producer queues.",
68 var_labels: ["sink_id"],
69 )),
70 rdkafka_msg_size: registry.register(metric!(
71 name: "mz_sink_rdkafka_msg_size",
72 help: "The current total size of messages in producer queues.",
73 var_labels: ["sink_id"],
74 )),
75 rdkafka_txmsgs: registry.register(metric!(
76 name: "mz_sink_rdkafka_txmsgs",
77 help: "The total number of messages transmitted (produced) to brokers.",
78 var_labels: ["sink_id"],
79 )),
80 rdkafka_txmsg_bytes: registry.register(metric!(
81 name: "mz_sink_rdkafka_txmsg_bytes",
82 help: "The total number of bytes transmitted (produced) to brokers.",
83 var_labels: ["sink_id"],
84 )),
85 rdkafka_tx: registry.register(metric!(
86 name: "mz_sink_rdkafka_tx",
87 help: "The total number of requests sent to brokers.",
88 var_labels: ["sink_id"],
89 )),
90 rdkafka_tx_bytes: registry.register(metric!(
91 name: "mz_sink_rdkafka_tx_bytes",
92 help: "The total number of bytes transmitted to brokers.",
93 var_labels: ["sink_id"],
94 )),
95 rdkafka_outbuf_cnt: registry.register(metric!(
96 name: "mz_sink_rdkafka_outbuf_cnt",
97 help: "The number of requests awaiting transmission across all brokers.",
98 var_labels: ["sink_id"],
99 )),
100 rdkafka_outbuf_msg_cnt: registry.register(metric!(
101 name: "mz_sink_rdkafka_outbuf_msg_cnt",
102 help: "The number of messages awaiting transmission across all brokers.",
103 var_labels: ["sink_id"],
104 visibility: MetricVisibility::Public,
105 )),
106 rdkafka_waitresp_cnt: registry.register(metric!(
107 name: "mz_sink_rdkafka_waitresp_cnt",
108 help: "The number of requests in-flight across all brokers that are awaiting a \
109 response.",
110 var_labels: ["sink_id"],
111 )),
112 rdkafka_waitresp_msg_cnt: registry.register(metric!(
113 name: "mz_sink_rdkafka_waitresp_msg_cnt",
114 help: "The number of messages in-flight across all brokers that are awaiting a \
115 response.",
116 var_labels: ["sink_id"],
117 )),
118 rdkafka_txerrs: registry.register(metric!(
119 name: "mz_sink_rdkafka_txerrs",
120 help: "The total number of transmission errors across all brokers.",
121 var_labels: ["sink_id"],
122 visibility: MetricVisibility::Public,
123 )),
124 rdkafka_txretries: registry.register(metric!(
125 name: "mz_sink_rdkafka_txretries",
126 help: "The total number of request retries across all brokers.",
127 var_labels: ["sink_id"],
128 )),
129 rdkafka_req_timeouts: registry.register(metric!(
130 name: "mz_sink_rdkafka_req_timeouts",
131 help: "The total number of requests that timed out across all brokers.",
132 var_labels: ["sink_id"],
133 )),
134 rdkafka_connects: registry.register(metric!(
135 name: "mz_sink_rdkafka_connects",
136 help: "The number of connection attempts, including successful and failed \
137 attempts, and name resolution failures across all brokers.",
138 var_labels: ["sink_id"],
139 visibility: MetricVisibility::Public,
140 )),
141 rdkafka_disconnects: registry.register(metric!(
142 name: "mz_sink_rdkafka_disconnects",
143 help: "The number of disconnections, whether triggered by the broker, the \
144 network, the load balancer, or something else across all brokers.",
145 var_labels: ["sink_id"],
146 visibility: MetricVisibility::Public,
147 )),
148 outstanding_progress_records: registry.register(metric!(
149 name: "mz_sink_oustanding_progress_records",
150 help: "The number of outstanding progress records that need to be read before the sink can resume.",
151 var_labels: ["sink_id"],
152 )),
153 consumed_progress_records: registry.register(metric!(
154 name: "mz_sink_consumed_progress_records",
155 help: "The number of progress records consumed by the sink.",
156 var_labels: ["sink_id"],
157 )),
158 partition_count: registry.register(metric!(
159 name: "mz_sink_partition_count",
160 help: "The number of partitions this sink is publishing to.",
161 var_labels: ["sink_id"],
162 )),
163 }
164 }
165}
166
167pub(crate) struct KafkaSinkMetrics {
169 pub rdkafka_msg_cnt: DeleteOnDropGauge<AtomicU64, Vec<String>>,
171 pub rdkafka_msg_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
173 pub rdkafka_txmsgs: DeleteOnDropGauge<AtomicI64, Vec<String>>,
175 pub rdkafka_txmsg_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
177 pub rdkafka_tx: DeleteOnDropGauge<AtomicI64, Vec<String>>,
179 pub rdkafka_tx_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
181 pub rdkafka_outbuf_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
183 pub rdkafka_outbuf_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
185 pub rdkafka_waitresp_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
187 pub rdkafka_waitresp_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
189 pub rdkafka_txerrs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
191 pub rdkafka_txretries: DeleteOnDropGauge<AtomicU64, Vec<String>>,
193 pub rdkafka_req_timeouts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
195 pub rdkafka_connects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
198 pub rdkafka_disconnects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
201 pub outstanding_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
203 pub consumed_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
205 pub partition_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
207}
208
209impl KafkaSinkMetrics {
210 pub fn new(defs: &KafkaSinkMetricDefs, sink_id: GlobalId) -> Self {
212 let labels = &[sink_id.to_string()];
213 Self {
214 rdkafka_msg_cnt: defs
215 .rdkafka_msg_cnt
216 .get_delete_on_drop_metric(labels.to_vec()),
217 rdkafka_msg_size: defs
218 .rdkafka_msg_size
219 .get_delete_on_drop_metric(labels.to_vec()),
220 rdkafka_txmsgs: defs
221 .rdkafka_txmsgs
222 .get_delete_on_drop_metric(labels.to_vec()),
223 rdkafka_txmsg_bytes: defs
224 .rdkafka_txmsg_bytes
225 .get_delete_on_drop_metric(labels.to_vec()),
226 rdkafka_tx: defs.rdkafka_tx.get_delete_on_drop_metric(labels.to_vec()),
227 rdkafka_tx_bytes: defs
228 .rdkafka_tx_bytes
229 .get_delete_on_drop_metric(labels.to_vec()),
230 rdkafka_outbuf_cnt: defs
231 .rdkafka_outbuf_cnt
232 .get_delete_on_drop_metric(labels.to_vec()),
233 rdkafka_outbuf_msg_cnt: defs
234 .rdkafka_outbuf_msg_cnt
235 .get_delete_on_drop_metric(labels.to_vec()),
236 rdkafka_waitresp_cnt: defs
237 .rdkafka_waitresp_cnt
238 .get_delete_on_drop_metric(labels.to_vec()),
239 rdkafka_waitresp_msg_cnt: defs
240 .rdkafka_waitresp_msg_cnt
241 .get_delete_on_drop_metric(labels.to_vec()),
242 rdkafka_txerrs: defs
243 .rdkafka_txerrs
244 .get_delete_on_drop_metric(labels.to_vec()),
245 rdkafka_txretries: defs
246 .rdkafka_txretries
247 .get_delete_on_drop_metric(labels.to_vec()),
248 rdkafka_req_timeouts: defs
249 .rdkafka_req_timeouts
250 .get_delete_on_drop_metric(labels.to_vec()),
251 rdkafka_connects: defs
252 .rdkafka_connects
253 .get_delete_on_drop_metric(labels.to_vec()),
254 rdkafka_disconnects: defs
255 .rdkafka_disconnects
256 .get_delete_on_drop_metric(labels.to_vec()),
257 outstanding_progress_records: defs
258 .outstanding_progress_records
259 .get_delete_on_drop_metric(labels.to_vec()),
260 consumed_progress_records: defs
261 .consumed_progress_records
262 .get_delete_on_drop_metric(labels.to_vec()),
263 partition_count: defs
264 .partition_count
265 .get_delete_on_drop_metric(labels.to_vec()),
266 }
267 }
268}