Skip to main content

mz_storage/metrics/sink/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for kafka sinks.
11
12use mz_ore::metric;
13use mz_ore::metrics::{
14    DeleteOnDropGauge, IntGaugeVec, MetricVisibility, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicI64, AtomicU64};
18
19/// Definitions for librdkafka produced metrics used in sinks.
20#[derive(Clone, Debug)]
21pub(crate) struct KafkaSinkMetricDefs {
22    /// The current number of messages in producer queues.
23    pub rdkafka_msg_cnt: UIntGaugeVec,
24    /// The current total size of messages in producer queues.
25    pub rdkafka_msg_size: UIntGaugeVec,
26    /// The total number of messages transmitted (produced) to brokers.
27    pub rdkafka_txmsgs: IntGaugeVec,
28    /// The total number of bytes transmitted (produced) to brokers.
29    pub rdkafka_txmsg_bytes: IntGaugeVec,
30    /// The total number of requests sent to brokers.
31    pub rdkafka_tx: IntGaugeVec,
32    /// The total number of bytes transmitted to brokers.
33    pub rdkafka_tx_bytes: IntGaugeVec,
34    /// The number of requests awaiting transmission across all brokers.
35    pub rdkafka_outbuf_cnt: IntGaugeVec,
36    /// The number of messages awaiting transmission across all brokers.
37    pub rdkafka_outbuf_msg_cnt: IntGaugeVec,
38    /// The number of requests in-flight across all brokers that are awaiting a response.
39    pub rdkafka_waitresp_cnt: IntGaugeVec,
40    /// The number of messages in-flight across all brokers that are awaiting a response.
41    pub rdkafka_waitresp_msg_cnt: IntGaugeVec,
42    /// The total number of transmission errors across all brokers.
43    pub rdkafka_txerrs: UIntGaugeVec,
44    /// The total number of request retries across all brokers.
45    pub rdkafka_txretries: UIntGaugeVec,
46    /// The total number of requests that timed out across all brokers.
47    pub rdkafka_req_timeouts: UIntGaugeVec,
48    /// The number of connection attempts, including successful and failed attempts, and name
49    /// resolution failures across all brokers.
50    pub rdkafka_connects: IntGaugeVec,
51    /// The number of disconnections, whether triggered by the broker, the network, the load
52    /// balancer, or something else across all brokers.
53    pub rdkafka_disconnects: IntGaugeVec,
54    /// The number of outstanding progress records that need to be read before the sink can resume.
55    pub outstanding_progress_records: UIntGaugeVec,
56    /// The number of progress records consumed while resuming the sink.
57    pub consumed_progress_records: UIntGaugeVec,
58    /// The number of partitions this sink is publishing to.
59    pub partition_count: UIntGaugeVec,
60}
61
62impl KafkaSinkMetricDefs {
63    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
64        Self {
65            rdkafka_msg_cnt: registry.register(metric!(
66                name: "mz_sink_rdkafka_msg_cnt",
67                help: "The current number of messages in producer queues.",
68                var_labels: ["sink_id"],
69            )),
70            rdkafka_msg_size: registry.register(metric!(
71                name: "mz_sink_rdkafka_msg_size",
72                help: "The current total size of messages in producer queues.",
73                var_labels: ["sink_id"],
74            )),
75            rdkafka_txmsgs: registry.register(metric!(
76                name: "mz_sink_rdkafka_txmsgs",
77                help: "The total number of messages transmitted (produced) to brokers.",
78                var_labels: ["sink_id"],
79            )),
80            rdkafka_txmsg_bytes: registry.register(metric!(
81                name: "mz_sink_rdkafka_txmsg_bytes",
82                help: "The total number of bytes transmitted (produced) to brokers.",
83                var_labels: ["sink_id"],
84            )),
85            rdkafka_tx: registry.register(metric!(
86                name: "mz_sink_rdkafka_tx",
87                help: "The total number of requests sent to brokers.",
88                var_labels: ["sink_id"],
89            )),
90            rdkafka_tx_bytes: registry.register(metric!(
91                name: "mz_sink_rdkafka_tx_bytes",
92                help: "The total number of bytes transmitted to brokers.",
93                var_labels: ["sink_id"],
94            )),
95            rdkafka_outbuf_cnt: registry.register(metric!(
96                name: "mz_sink_rdkafka_outbuf_cnt",
97                help: "The number of requests awaiting transmission across all brokers.",
98                var_labels: ["sink_id"],
99            )),
100            rdkafka_outbuf_msg_cnt: registry.register(metric!(
101                name: "mz_sink_rdkafka_outbuf_msg_cnt",
102                help: "The number of messages awaiting transmission across all brokers.",
103                var_labels: ["sink_id"],
104                visibility: MetricVisibility::Public,
105            )),
106            rdkafka_waitresp_cnt: registry.register(metric!(
107                name: "mz_sink_rdkafka_waitresp_cnt",
108                help: "The number of requests in-flight across all brokers that are awaiting a \
109                       response.",
110                var_labels: ["sink_id"],
111            )),
112            rdkafka_waitresp_msg_cnt: registry.register(metric!(
113                name: "mz_sink_rdkafka_waitresp_msg_cnt",
114                help: "The number of messages in-flight across all brokers that are awaiting a \
115                       response.",
116                var_labels: ["sink_id"],
117            )),
118            rdkafka_txerrs: registry.register(metric!(
119                name: "mz_sink_rdkafka_txerrs",
120                help: "The total number of transmission errors across all brokers.",
121                var_labels: ["sink_id"],
122                visibility: MetricVisibility::Public,
123            )),
124            rdkafka_txretries: registry.register(metric!(
125                name: "mz_sink_rdkafka_txretries",
126                help: "The total number of request retries across all brokers.",
127                var_labels: ["sink_id"],
128            )),
129            rdkafka_req_timeouts: registry.register(metric!(
130                name: "mz_sink_rdkafka_req_timeouts",
131                help: "The total number of requests that timed out across all brokers.",
132                var_labels: ["sink_id"],
133            )),
134            rdkafka_connects: registry.register(metric!(
135                name: "mz_sink_rdkafka_connects",
136                help: "The number of connection attempts, including successful and failed \
137                       attempts, and name resolution failures across all brokers.",
138                var_labels: ["sink_id"],
139                visibility: MetricVisibility::Public,
140            )),
141            rdkafka_disconnects: registry.register(metric!(
142                name: "mz_sink_rdkafka_disconnects",
143                help: "The number of disconnections, whether triggered by the broker, the \
144                      network, the load balancer, or something else across all brokers.",
145                var_labels: ["sink_id"],
146                visibility: MetricVisibility::Public,
147            )),
148            outstanding_progress_records: registry.register(metric!(
149                name: "mz_sink_oustanding_progress_records",
150                help: "The number of outstanding progress records that need to be read before the sink can resume.",
151                var_labels: ["sink_id"],
152            )),
153            consumed_progress_records: registry.register(metric!(
154                name: "mz_sink_consumed_progress_records",
155                help: "The number of progress records consumed by the sink.",
156                var_labels: ["sink_id"],
157            )),
158            partition_count: registry.register(metric!(
159                name: "mz_sink_partition_count",
160                help: "The number of partitions this sink is publishing to.",
161                var_labels: ["sink_id"],
162            )),
163        }
164    }
165}
166
167/// Metrics reported by librdkafka
168pub(crate) struct KafkaSinkMetrics {
169    /// The current number of messages in producer queues.
170    pub rdkafka_msg_cnt: DeleteOnDropGauge<AtomicU64, Vec<String>>,
171    /// The current total size of messages in producer queues.
172    pub rdkafka_msg_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
173    /// The total number of messages transmitted (produced) to brokers.
174    pub rdkafka_txmsgs: DeleteOnDropGauge<AtomicI64, Vec<String>>,
175    /// The total number of bytes transmitted (produced) to brokers.
176    pub rdkafka_txmsg_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
177    /// The total number of requests sent to brokers.
178    pub rdkafka_tx: DeleteOnDropGauge<AtomicI64, Vec<String>>,
179    /// The total number of bytes transmitted to brokers.
180    pub rdkafka_tx_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
181    /// The number of requests awaiting transmission across all brokers.
182    pub rdkafka_outbuf_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
183    /// The number of messages awaiting transmission across all brokers.
184    pub rdkafka_outbuf_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
185    /// The number of requests in-flight across all brokers that are awaiting a response.
186    pub rdkafka_waitresp_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
187    /// The number of messages in-flight across all brokers that are awaiting a response.
188    pub rdkafka_waitresp_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
189    /// The total number of transmission errors across all brokers.
190    pub rdkafka_txerrs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
191    /// The total number of request retries across all brokers.
192    pub rdkafka_txretries: DeleteOnDropGauge<AtomicU64, Vec<String>>,
193    /// The total number of requests that timed out across all brokers.
194    pub rdkafka_req_timeouts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
195    /// The number of connection attempts, including successful and failed attempts, and name
196    /// resolution failures across all brokers.
197    pub rdkafka_connects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
198    /// The number of disconnections, whether triggered by the broker, the network, the load
199    /// balancer, or something else across all brokers.
200    pub rdkafka_disconnects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
201    /// The number of outstanding progress records that need to be read before the sink can resume.
202    pub outstanding_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
203    /// The number of progress records consumed while resuming the sink.
204    pub consumed_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
205    /// The number of partitions this sink is publishing to.
206    pub partition_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
207}
208
209impl KafkaSinkMetrics {
210    /// Initializes source metrics for a given (source_id, worker_id)
211    pub fn new(defs: &KafkaSinkMetricDefs, sink_id: GlobalId) -> Self {
212        let labels = &[sink_id.to_string()];
213        Self {
214            rdkafka_msg_cnt: defs
215                .rdkafka_msg_cnt
216                .get_delete_on_drop_metric(labels.to_vec()),
217            rdkafka_msg_size: defs
218                .rdkafka_msg_size
219                .get_delete_on_drop_metric(labels.to_vec()),
220            rdkafka_txmsgs: defs
221                .rdkafka_txmsgs
222                .get_delete_on_drop_metric(labels.to_vec()),
223            rdkafka_txmsg_bytes: defs
224                .rdkafka_txmsg_bytes
225                .get_delete_on_drop_metric(labels.to_vec()),
226            rdkafka_tx: defs.rdkafka_tx.get_delete_on_drop_metric(labels.to_vec()),
227            rdkafka_tx_bytes: defs
228                .rdkafka_tx_bytes
229                .get_delete_on_drop_metric(labels.to_vec()),
230            rdkafka_outbuf_cnt: defs
231                .rdkafka_outbuf_cnt
232                .get_delete_on_drop_metric(labels.to_vec()),
233            rdkafka_outbuf_msg_cnt: defs
234                .rdkafka_outbuf_msg_cnt
235                .get_delete_on_drop_metric(labels.to_vec()),
236            rdkafka_waitresp_cnt: defs
237                .rdkafka_waitresp_cnt
238                .get_delete_on_drop_metric(labels.to_vec()),
239            rdkafka_waitresp_msg_cnt: defs
240                .rdkafka_waitresp_msg_cnt
241                .get_delete_on_drop_metric(labels.to_vec()),
242            rdkafka_txerrs: defs
243                .rdkafka_txerrs
244                .get_delete_on_drop_metric(labels.to_vec()),
245            rdkafka_txretries: defs
246                .rdkafka_txretries
247                .get_delete_on_drop_metric(labels.to_vec()),
248            rdkafka_req_timeouts: defs
249                .rdkafka_req_timeouts
250                .get_delete_on_drop_metric(labels.to_vec()),
251            rdkafka_connects: defs
252                .rdkafka_connects
253                .get_delete_on_drop_metric(labels.to_vec()),
254            rdkafka_disconnects: defs
255                .rdkafka_disconnects
256                .get_delete_on_drop_metric(labels.to_vec()),
257            outstanding_progress_records: defs
258                .outstanding_progress_records
259                .get_delete_on_drop_metric(labels.to_vec()),
260            consumed_progress_records: defs
261                .consumed_progress_records
262                .get_delete_on_drop_metric(labels.to_vec()),
263            partition_count: defs
264                .partition_count
265                .get_delete_on_drop_metric(labels.to_vec()),
266        }
267    }
268}