mz_storage/metrics/sink/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for kafka sinks.
11
12use mz_ore::metric;
13use mz_ore::metrics::{DeleteOnDropGauge, IntGaugeVec, MetricsRegistry, UIntGaugeVec};
14use mz_repr::GlobalId;
15use prometheus::core::{AtomicI64, AtomicU64};
16
17/// Definitions for librdkafka produced metrics used in sinks.
18#[derive(Clone, Debug)]
19pub(crate) struct KafkaSinkMetricDefs {
20    /// The current number of messages in producer queues.
21    pub rdkafka_msg_cnt: UIntGaugeVec,
22    /// The current total size of messages in producer queues.
23    pub rdkafka_msg_size: UIntGaugeVec,
24    /// The total number of messages transmitted (produced) to brokers.
25    pub rdkafka_txmsgs: IntGaugeVec,
26    /// The total number of bytes transmitted (produced) to brokers.
27    pub rdkafka_txmsg_bytes: IntGaugeVec,
28    /// The total number of requests sent to brokers.
29    pub rdkafka_tx: IntGaugeVec,
30    /// The total number of bytes transmitted to brokers.
31    pub rdkafka_tx_bytes: IntGaugeVec,
32    /// The number of requests awaiting transmission across all brokers.
33    pub rdkafka_outbuf_cnt: IntGaugeVec,
34    /// The number of messages awaiting transmission across all brokers.
35    pub rdkafka_outbuf_msg_cnt: IntGaugeVec,
36    /// The number of requests in-flight across all brokers that are awaiting a response.
37    pub rdkafka_waitresp_cnt: IntGaugeVec,
38    /// The number of messages in-flight across all brokers that are awaiting a response.
39    pub rdkafka_waitresp_msg_cnt: IntGaugeVec,
40    /// The total number of transmission errors across all brokers.
41    pub rdkafka_txerrs: UIntGaugeVec,
42    /// The total number of request retries across all brokers.
43    pub rdkafka_txretries: UIntGaugeVec,
44    /// The total number of requests that timed out across all brokers.
45    pub rdkafka_req_timeouts: UIntGaugeVec,
46    /// The number of connection attempts, including successful and failed attempts, and name
47    /// resolution failures across all brokers.
48    pub rdkafka_connects: IntGaugeVec,
49    /// The number of disconnections, whether triggered by the broker, the network, the load
50    /// balancer, or something else across all brokers.
51    pub rdkafka_disconnects: IntGaugeVec,
52    /// The number of outstanding progress records that need to be read before the sink can resume.
53    pub outstanding_progress_records: UIntGaugeVec,
54    /// The number of progress records consumed while resuming the sink.
55    pub consumed_progress_records: UIntGaugeVec,
56    /// The number of partitions this sink is publishing to.
57    pub partition_count: UIntGaugeVec,
58}
59
60impl KafkaSinkMetricDefs {
61    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
62        Self {
63            rdkafka_msg_cnt: registry.register(metric!(
64                name: "mz_sink_rdkafka_msg_cnt",
65                help: "The current number of messages in producer queues.",
66                var_labels: ["sink_id"],
67            )),
68            rdkafka_msg_size: registry.register(metric!(
69                name: "mz_sink_rdkafka_msg_size",
70                help: "The current total size of messages in producer queues.",
71                var_labels: ["sink_id"],
72            )),
73            rdkafka_txmsgs: registry.register(metric!(
74                name: "mz_sink_rdkafka_txmsgs",
75                help: "The total number of messages transmitted (produced) to brokers.",
76                var_labels: ["sink_id"],
77            )),
78            rdkafka_txmsg_bytes: registry.register(metric!(
79                name: "mz_sink_rdkafka_txmsg_bytes",
80                help: "The total number of bytes transmitted (produced) to brokers.",
81                var_labels: ["sink_id"],
82            )),
83            rdkafka_tx: registry.register(metric!(
84                name: "mz_sink_rdkafka_tx",
85                help: "The total number of requests sent to brokers.",
86                var_labels: ["sink_id"],
87            )),
88            rdkafka_tx_bytes: registry.register(metric!(
89                name: "mz_sink_rdkafka_tx_bytes",
90                help: "The total number of bytes transmitted to brokers.",
91                var_labels: ["sink_id"],
92            )),
93            rdkafka_outbuf_cnt: registry.register(metric!(
94                name: "mz_sink_rdkafka_outbuf_cnt",
95                help: "The number of requests awaiting transmission across all brokers.",
96                var_labels: ["sink_id"],
97            )),
98            rdkafka_outbuf_msg_cnt: registry.register(metric!(
99                name: "mz_sink_rdkafka_outbuf_msg_cnt",
100                help: "The number of messages awaiting transmission across all brokers.",
101                var_labels: ["sink_id"],
102            )),
103            rdkafka_waitresp_cnt: registry.register(metric!(
104                name: "mz_sink_rdkafka_waitresp_cnt",
105                help: "The number of requests in-flight across all brokers that are awaiting a \
106                       response.",
107                var_labels: ["sink_id"],
108            )),
109            rdkafka_waitresp_msg_cnt: registry.register(metric!(
110                name: "mz_sink_rdkafka_waitresp_msg_cnt",
111                help: "The number of messages in-flight across all brokers that are awaiting a \
112                       response.",
113                var_labels: ["sink_id"],
114            )),
115            rdkafka_txerrs: registry.register(metric!(
116                name: "mz_sink_rdkafka_txerrs",
117                help: "The total number of transmission errors across all brokers.",
118                var_labels: ["sink_id"],
119            )),
120            rdkafka_txretries: registry.register(metric!(
121                name: "mz_sink_rdkafka_txretries",
122                help: "The total number of request retries across all brokers.",
123                var_labels: ["sink_id"],
124            )),
125            rdkafka_req_timeouts: registry.register(metric!(
126                name: "mz_sink_rdkafka_req_timeouts",
127                help: "The total number of requests that timed out across all brokers.",
128                var_labels: ["sink_id"],
129            )),
130            rdkafka_connects: registry.register(metric!(
131                name: "mz_sink_rdkafka_connects",
132                help: "The number of connection attempts, including successful and failed \
133                       attempts, and name resolution failures across all brokers.",
134                var_labels: ["sink_id"],
135            )),
136            rdkafka_disconnects: registry.register(metric!(
137                name: "mz_sink_rdkafka_disconnects",
138                help: "The number of disconnections, whether triggered by the broker, the \
139                      network, the load balancer, or something else across all brokers.",
140                var_labels: ["sink_id"],
141            )),
142            outstanding_progress_records: registry.register(metric!(
143                name: "mz_sink_oustanding_progress_records",
144                help: "The number of outstanding progress records that need to be read before the sink can resume.",
145                var_labels: ["sink_id"],
146            )),
147            consumed_progress_records: registry.register(metric!(
148                name: "mz_sink_consumed_progress_records",
149                help: "The number of progress records consumed by the sink.",
150                var_labels: ["sink_id"],
151            )),
152            partition_count: registry.register(metric!(
153                name: "mz_sink_partition_count",
154                help: "The number of partitions this sink is publishing to.",
155                var_labels: ["sink_id"],
156            )),
157        }
158    }
159}
160
161/// Metrics reported by librdkafka
162pub(crate) struct KafkaSinkMetrics {
163    /// The current number of messages in producer queues.
164    pub rdkafka_msg_cnt: DeleteOnDropGauge<AtomicU64, Vec<String>>,
165    /// The current total size of messages in producer queues.
166    pub rdkafka_msg_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
167    /// The total number of messages transmitted (produced) to brokers.
168    pub rdkafka_txmsgs: DeleteOnDropGauge<AtomicI64, Vec<String>>,
169    /// The total number of bytes transmitted (produced) to brokers.
170    pub rdkafka_txmsg_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
171    /// The total number of requests sent to brokers.
172    pub rdkafka_tx: DeleteOnDropGauge<AtomicI64, Vec<String>>,
173    /// The total number of bytes transmitted to brokers.
174    pub rdkafka_tx_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
175    /// The number of requests awaiting transmission across all brokers.
176    pub rdkafka_outbuf_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
177    /// The number of messages awaiting transmission across all brokers.
178    pub rdkafka_outbuf_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
179    /// The number of requests in-flight across all brokers that are awaiting a response.
180    pub rdkafka_waitresp_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
181    /// The number of messages in-flight across all brokers that are awaiting a response.
182    pub rdkafka_waitresp_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
183    /// The total number of transmission errors across all brokers.
184    pub rdkafka_txerrs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
185    /// The total number of request retries across all brokers.
186    pub rdkafka_txretries: DeleteOnDropGauge<AtomicU64, Vec<String>>,
187    /// The total number of requests that timed out across all brokers.
188    pub rdkafka_req_timeouts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
189    /// The number of connection attempts, including successful and failed attempts, and name
190    /// resolution failures across all brokers.
191    pub rdkafka_connects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
192    /// The number of disconnections, whether triggered by the broker, the network, the load
193    /// balancer, or something else across all brokers.
194    pub rdkafka_disconnects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
195    /// The number of outstanding progress records that need to be read before the sink can resume.
196    pub outstanding_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
197    /// The number of progress records consumed while resuming the sink.
198    pub consumed_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
199    /// The number of partitions this sink is publishing to.
200    pub partition_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
201}
202
203impl KafkaSinkMetrics {
204    /// Initializes source metrics for a given (source_id, worker_id)
205    pub fn new(defs: &KafkaSinkMetricDefs, sink_id: GlobalId) -> Self {
206        let labels = &[sink_id.to_string()];
207        Self {
208            rdkafka_msg_cnt: defs
209                .rdkafka_msg_cnt
210                .get_delete_on_drop_metric(labels.to_vec()),
211            rdkafka_msg_size: defs
212                .rdkafka_msg_size
213                .get_delete_on_drop_metric(labels.to_vec()),
214            rdkafka_txmsgs: defs
215                .rdkafka_txmsgs
216                .get_delete_on_drop_metric(labels.to_vec()),
217            rdkafka_txmsg_bytes: defs
218                .rdkafka_txmsg_bytes
219                .get_delete_on_drop_metric(labels.to_vec()),
220            rdkafka_tx: defs.rdkafka_tx.get_delete_on_drop_metric(labels.to_vec()),
221            rdkafka_tx_bytes: defs
222                .rdkafka_tx_bytes
223                .get_delete_on_drop_metric(labels.to_vec()),
224            rdkafka_outbuf_cnt: defs
225                .rdkafka_outbuf_cnt
226                .get_delete_on_drop_metric(labels.to_vec()),
227            rdkafka_outbuf_msg_cnt: defs
228                .rdkafka_outbuf_msg_cnt
229                .get_delete_on_drop_metric(labels.to_vec()),
230            rdkafka_waitresp_cnt: defs
231                .rdkafka_waitresp_cnt
232                .get_delete_on_drop_metric(labels.to_vec()),
233            rdkafka_waitresp_msg_cnt: defs
234                .rdkafka_waitresp_msg_cnt
235                .get_delete_on_drop_metric(labels.to_vec()),
236            rdkafka_txerrs: defs
237                .rdkafka_txerrs
238                .get_delete_on_drop_metric(labels.to_vec()),
239            rdkafka_txretries: defs
240                .rdkafka_txretries
241                .get_delete_on_drop_metric(labels.to_vec()),
242            rdkafka_req_timeouts: defs
243                .rdkafka_req_timeouts
244                .get_delete_on_drop_metric(labels.to_vec()),
245            rdkafka_connects: defs
246                .rdkafka_connects
247                .get_delete_on_drop_metric(labels.to_vec()),
248            rdkafka_disconnects: defs
249                .rdkafka_disconnects
250                .get_delete_on_drop_metric(labels.to_vec()),
251            outstanding_progress_records: defs
252                .outstanding_progress_records
253                .get_delete_on_drop_metric(labels.to_vec()),
254            consumed_progress_records: defs
255                .consumed_progress_records
256                .get_delete_on_drop_metric(labels.to_vec()),
257            partition_count: defs
258                .partition_count
259                .get_delete_on_drop_metric(labels.to_vec()),
260        }
261    }
262}