1use mz_ore::metric;
13use mz_ore::metrics::{
14 DeleteOnDropGauge, IntGaugeVec, MetricTag, MetricVisibility, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicI64, AtomicU64};
18
19#[derive(Clone, Debug)]
21pub(crate) struct KafkaSinkMetricDefs {
22 pub rdkafka_msg_cnt: UIntGaugeVec,
24 pub rdkafka_msg_size: UIntGaugeVec,
26 pub rdkafka_txmsgs: IntGaugeVec,
28 pub rdkafka_txmsg_bytes: IntGaugeVec,
30 pub rdkafka_tx: IntGaugeVec,
32 pub rdkafka_tx_bytes: IntGaugeVec,
34 pub rdkafka_outbuf_cnt: IntGaugeVec,
36 pub rdkafka_outbuf_msg_cnt: IntGaugeVec,
38 pub rdkafka_waitresp_cnt: IntGaugeVec,
40 pub rdkafka_waitresp_msg_cnt: IntGaugeVec,
42 pub rdkafka_txerrs: UIntGaugeVec,
44 pub rdkafka_txretries: UIntGaugeVec,
46 pub rdkafka_req_timeouts: UIntGaugeVec,
48 pub rdkafka_connects: IntGaugeVec,
51 pub rdkafka_disconnects: IntGaugeVec,
54 pub outstanding_progress_records: UIntGaugeVec,
56 pub consumed_progress_records: UIntGaugeVec,
58 pub partition_count: UIntGaugeVec,
60}
61
62impl KafkaSinkMetricDefs {
63 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
64 Self {
65 rdkafka_msg_cnt: registry.register(metric!(
66 name: "mz_sink_rdkafka_msg_cnt",
67 help: "The current number of messages in producer queues.",
68 var_labels: ["sink_id"],
69 )),
70 rdkafka_msg_size: registry.register(metric!(
71 name: "mz_sink_rdkafka_msg_size",
72 help: "The current total size of messages in producer queues.",
73 var_labels: ["sink_id"],
74 )),
75 rdkafka_txmsgs: registry.register(metric!(
76 name: "mz_sink_rdkafka_txmsgs",
77 help: "The total number of messages transmitted (produced) to brokers.",
78 var_labels: ["sink_id"],
79 )),
80 rdkafka_txmsg_bytes: registry.register(metric!(
81 name: "mz_sink_rdkafka_txmsg_bytes",
82 help: "The total number of bytes transmitted (produced) to brokers.",
83 var_labels: ["sink_id"],
84 )),
85 rdkafka_tx: registry.register(metric!(
86 name: "mz_sink_rdkafka_tx",
87 help: "The total number of requests sent to brokers.",
88 var_labels: ["sink_id"],
89 )),
90 rdkafka_tx_bytes: registry.register(metric!(
91 name: "mz_sink_rdkafka_tx_bytes",
92 help: "The total number of bytes transmitted to brokers.",
93 var_labels: ["sink_id"],
94 )),
95 rdkafka_outbuf_cnt: registry.register(metric!(
96 name: "mz_sink_rdkafka_outbuf_cnt",
97 help: "The number of requests awaiting transmission across all brokers.",
98 var_labels: ["sink_id"],
99 )),
100 rdkafka_outbuf_msg_cnt: registry.register(metric!(
101 name: "mz_sink_rdkafka_outbuf_msg_cnt",
102 help: "The number of messages awaiting transmission across all brokers.",
103 var_labels: ["sink_id"],
104 visibility: MetricVisibility::Public,
105 tags: [MetricTag::Sink],
106 )),
107 rdkafka_waitresp_cnt: registry.register(metric!(
108 name: "mz_sink_rdkafka_waitresp_cnt",
109 help: "The number of requests in-flight across all brokers that are awaiting a \
110 response.",
111 var_labels: ["sink_id"],
112 )),
113 rdkafka_waitresp_msg_cnt: registry.register(metric!(
114 name: "mz_sink_rdkafka_waitresp_msg_cnt",
115 help: "The number of messages in-flight across all brokers that are awaiting a \
116 response.",
117 var_labels: ["sink_id"],
118 )),
119 rdkafka_txerrs: registry.register(metric!(
120 name: "mz_sink_rdkafka_txerrs",
121 help: "The total number of transmission errors across all brokers.",
122 var_labels: ["sink_id"],
123 visibility: MetricVisibility::Public,
124 tags: [MetricTag::Sink],
125 )),
126 rdkafka_txretries: registry.register(metric!(
127 name: "mz_sink_rdkafka_txretries",
128 help: "The total number of request retries across all brokers.",
129 var_labels: ["sink_id"],
130 )),
131 rdkafka_req_timeouts: registry.register(metric!(
132 name: "mz_sink_rdkafka_req_timeouts",
133 help: "The total number of requests that timed out across all brokers.",
134 var_labels: ["sink_id"],
135 )),
136 rdkafka_connects: registry.register(metric!(
137 name: "mz_sink_rdkafka_connects",
138 help: "The number of connection attempts, including successful and failed \
139 attempts, and name resolution failures across all brokers.",
140 var_labels: ["sink_id"],
141 visibility: MetricVisibility::Public,
142 tags: [MetricTag::Sink],
143 )),
144 rdkafka_disconnects: registry.register(metric!(
145 name: "mz_sink_rdkafka_disconnects",
146 help: "The number of disconnections, whether triggered by the broker, the \
147 network, the load balancer, or something else across all brokers.",
148 var_labels: ["sink_id"],
149 visibility: MetricVisibility::Public,
150 tags: [MetricTag::Sink],
151 )),
152 outstanding_progress_records: registry.register(metric!(
153 name: "mz_sink_oustanding_progress_records",
154 help: "The number of outstanding progress records that need to be read before the sink can resume.",
155 var_labels: ["sink_id"],
156 )),
157 consumed_progress_records: registry.register(metric!(
158 name: "mz_sink_consumed_progress_records",
159 help: "The number of progress records consumed by the sink.",
160 var_labels: ["sink_id"],
161 )),
162 partition_count: registry.register(metric!(
163 name: "mz_sink_partition_count",
164 help: "The number of partitions this sink is publishing to.",
165 var_labels: ["sink_id"],
166 )),
167 }
168 }
169}
170
171pub(crate) struct KafkaSinkMetrics {
173 pub rdkafka_msg_cnt: DeleteOnDropGauge<AtomicU64, Vec<String>>,
175 pub rdkafka_msg_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
177 pub rdkafka_txmsgs: DeleteOnDropGauge<AtomicI64, Vec<String>>,
179 pub rdkafka_txmsg_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
181 pub rdkafka_tx: DeleteOnDropGauge<AtomicI64, Vec<String>>,
183 pub rdkafka_tx_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
185 pub rdkafka_outbuf_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
187 pub rdkafka_outbuf_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
189 pub rdkafka_waitresp_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
191 pub rdkafka_waitresp_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
193 pub rdkafka_txerrs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
195 pub rdkafka_txretries: DeleteOnDropGauge<AtomicU64, Vec<String>>,
197 pub rdkafka_req_timeouts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
199 pub rdkafka_connects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
202 pub rdkafka_disconnects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
205 pub outstanding_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
207 pub consumed_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
209 pub partition_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
211}
212
213impl KafkaSinkMetrics {
214 pub fn new(defs: &KafkaSinkMetricDefs, sink_id: GlobalId) -> Self {
216 let labels = &[sink_id.to_string()];
217 Self {
218 rdkafka_msg_cnt: defs
219 .rdkafka_msg_cnt
220 .get_delete_on_drop_metric(labels.to_vec()),
221 rdkafka_msg_size: defs
222 .rdkafka_msg_size
223 .get_delete_on_drop_metric(labels.to_vec()),
224 rdkafka_txmsgs: defs
225 .rdkafka_txmsgs
226 .get_delete_on_drop_metric(labels.to_vec()),
227 rdkafka_txmsg_bytes: defs
228 .rdkafka_txmsg_bytes
229 .get_delete_on_drop_metric(labels.to_vec()),
230 rdkafka_tx: defs.rdkafka_tx.get_delete_on_drop_metric(labels.to_vec()),
231 rdkafka_tx_bytes: defs
232 .rdkafka_tx_bytes
233 .get_delete_on_drop_metric(labels.to_vec()),
234 rdkafka_outbuf_cnt: defs
235 .rdkafka_outbuf_cnt
236 .get_delete_on_drop_metric(labels.to_vec()),
237 rdkafka_outbuf_msg_cnt: defs
238 .rdkafka_outbuf_msg_cnt
239 .get_delete_on_drop_metric(labels.to_vec()),
240 rdkafka_waitresp_cnt: defs
241 .rdkafka_waitresp_cnt
242 .get_delete_on_drop_metric(labels.to_vec()),
243 rdkafka_waitresp_msg_cnt: defs
244 .rdkafka_waitresp_msg_cnt
245 .get_delete_on_drop_metric(labels.to_vec()),
246 rdkafka_txerrs: defs
247 .rdkafka_txerrs
248 .get_delete_on_drop_metric(labels.to_vec()),
249 rdkafka_txretries: defs
250 .rdkafka_txretries
251 .get_delete_on_drop_metric(labels.to_vec()),
252 rdkafka_req_timeouts: defs
253 .rdkafka_req_timeouts
254 .get_delete_on_drop_metric(labels.to_vec()),
255 rdkafka_connects: defs
256 .rdkafka_connects
257 .get_delete_on_drop_metric(labels.to_vec()),
258 rdkafka_disconnects: defs
259 .rdkafka_disconnects
260 .get_delete_on_drop_metric(labels.to_vec()),
261 outstanding_progress_records: defs
262 .outstanding_progress_records
263 .get_delete_on_drop_metric(labels.to_vec()),
264 consumed_progress_records: defs
265 .consumed_progress_records
266 .get_delete_on_drop_metric(labels.to_vec()),
267 partition_count: defs
268 .partition_count
269 .get_delete_on_drop_metric(labels.to_vec()),
270 }
271 }
272}