Skip to main content

mz_storage/metrics/sink/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for kafka sinks.
11
12use mz_ore::metric;
13use mz_ore::metrics::{
14    DeleteOnDropGauge, IntGaugeVec, MetricTag, MetricVisibility, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicI64, AtomicU64};
18
19/// Definitions for librdkafka produced metrics used in sinks.
20#[derive(Clone, Debug)]
21pub(crate) struct KafkaSinkMetricDefs {
22    /// The current number of messages in producer queues.
23    pub rdkafka_msg_cnt: UIntGaugeVec,
24    /// The current total size of messages in producer queues.
25    pub rdkafka_msg_size: UIntGaugeVec,
26    /// The total number of messages transmitted (produced) to brokers.
27    pub rdkafka_txmsgs: IntGaugeVec,
28    /// The total number of bytes transmitted (produced) to brokers.
29    pub rdkafka_txmsg_bytes: IntGaugeVec,
30    /// The total number of requests sent to brokers.
31    pub rdkafka_tx: IntGaugeVec,
32    /// The total number of bytes transmitted to brokers.
33    pub rdkafka_tx_bytes: IntGaugeVec,
34    /// The number of requests awaiting transmission across all brokers.
35    pub rdkafka_outbuf_cnt: IntGaugeVec,
36    /// The number of messages awaiting transmission across all brokers.
37    pub rdkafka_outbuf_msg_cnt: IntGaugeVec,
38    /// The number of requests in-flight across all brokers that are awaiting a response.
39    pub rdkafka_waitresp_cnt: IntGaugeVec,
40    /// The number of messages in-flight across all brokers that are awaiting a response.
41    pub rdkafka_waitresp_msg_cnt: IntGaugeVec,
42    /// The total number of transmission errors across all brokers.
43    pub rdkafka_txerrs: UIntGaugeVec,
44    /// The total number of request retries across all brokers.
45    pub rdkafka_txretries: UIntGaugeVec,
46    /// The total number of requests that timed out across all brokers.
47    pub rdkafka_req_timeouts: UIntGaugeVec,
48    /// The number of connection attempts, including successful and failed attempts, and name
49    /// resolution failures across all brokers.
50    pub rdkafka_connects: IntGaugeVec,
51    /// The number of disconnections, whether triggered by the broker, the network, the load
52    /// balancer, or something else across all brokers.
53    pub rdkafka_disconnects: IntGaugeVec,
54    /// The number of outstanding progress records that need to be read before the sink can resume.
55    pub outstanding_progress_records: UIntGaugeVec,
56    /// The number of progress records consumed while resuming the sink.
57    pub consumed_progress_records: UIntGaugeVec,
58    /// The number of partitions this sink is publishing to.
59    pub partition_count: UIntGaugeVec,
60}
61
62impl KafkaSinkMetricDefs {
63    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
64        Self {
65            rdkafka_msg_cnt: registry.register(metric!(
66                name: "mz_sink_rdkafka_msg_cnt",
67                help: "The current number of messages in producer queues.",
68                var_labels: ["sink_id"],
69            )),
70            rdkafka_msg_size: registry.register(metric!(
71                name: "mz_sink_rdkafka_msg_size",
72                help: "The current total size of messages in producer queues.",
73                var_labels: ["sink_id"],
74            )),
75            rdkafka_txmsgs: registry.register(metric!(
76                name: "mz_sink_rdkafka_txmsgs",
77                help: "The total number of messages transmitted (produced) to brokers.",
78                var_labels: ["sink_id"],
79            )),
80            rdkafka_txmsg_bytes: registry.register(metric!(
81                name: "mz_sink_rdkafka_txmsg_bytes",
82                help: "The total number of bytes transmitted (produced) to brokers.",
83                var_labels: ["sink_id"],
84            )),
85            rdkafka_tx: registry.register(metric!(
86                name: "mz_sink_rdkafka_tx",
87                help: "The total number of requests sent to brokers.",
88                var_labels: ["sink_id"],
89            )),
90            rdkafka_tx_bytes: registry.register(metric!(
91                name: "mz_sink_rdkafka_tx_bytes",
92                help: "The total number of bytes transmitted to brokers.",
93                var_labels: ["sink_id"],
94            )),
95            rdkafka_outbuf_cnt: registry.register(metric!(
96                name: "mz_sink_rdkafka_outbuf_cnt",
97                help: "The number of requests awaiting transmission across all brokers.",
98                var_labels: ["sink_id"],
99            )),
100            rdkafka_outbuf_msg_cnt: registry.register(metric!(
101                name: "mz_sink_rdkafka_outbuf_msg_cnt",
102                help: "The number of messages awaiting transmission across all brokers.",
103                var_labels: ["sink_id"],
104                visibility: MetricVisibility::Public,
105                tags: [MetricTag::Sink],
106            )),
107            rdkafka_waitresp_cnt: registry.register(metric!(
108                name: "mz_sink_rdkafka_waitresp_cnt",
109                help: "The number of requests in-flight across all brokers that are awaiting a \
110                       response.",
111                var_labels: ["sink_id"],
112            )),
113            rdkafka_waitresp_msg_cnt: registry.register(metric!(
114                name: "mz_sink_rdkafka_waitresp_msg_cnt",
115                help: "The number of messages in-flight across all brokers that are awaiting a \
116                       response.",
117                var_labels: ["sink_id"],
118            )),
119            rdkafka_txerrs: registry.register(metric!(
120                name: "mz_sink_rdkafka_txerrs",
121                help: "The total number of transmission errors across all brokers.",
122                var_labels: ["sink_id"],
123                visibility: MetricVisibility::Public,
124                tags: [MetricTag::Sink],
125            )),
126            rdkafka_txretries: registry.register(metric!(
127                name: "mz_sink_rdkafka_txretries",
128                help: "The total number of request retries across all brokers.",
129                var_labels: ["sink_id"],
130            )),
131            rdkafka_req_timeouts: registry.register(metric!(
132                name: "mz_sink_rdkafka_req_timeouts",
133                help: "The total number of requests that timed out across all brokers.",
134                var_labels: ["sink_id"],
135            )),
136            rdkafka_connects: registry.register(metric!(
137                name: "mz_sink_rdkafka_connects",
138                help: "The number of connection attempts, including successful and failed \
139                       attempts, and name resolution failures across all brokers.",
140                var_labels: ["sink_id"],
141                visibility: MetricVisibility::Public,
142                tags: [MetricTag::Sink],
143            )),
144            rdkafka_disconnects: registry.register(metric!(
145                name: "mz_sink_rdkafka_disconnects",
146                help: "The number of disconnections, whether triggered by the broker, the \
147                      network, the load balancer, or something else across all brokers.",
148                var_labels: ["sink_id"],
149                visibility: MetricVisibility::Public,
150                tags: [MetricTag::Sink],
151            )),
152            outstanding_progress_records: registry.register(metric!(
153                name: "mz_sink_oustanding_progress_records",
154                help: "The number of outstanding progress records that need to be read before the sink can resume.",
155                var_labels: ["sink_id"],
156            )),
157            consumed_progress_records: registry.register(metric!(
158                name: "mz_sink_consumed_progress_records",
159                help: "The number of progress records consumed by the sink.",
160                var_labels: ["sink_id"],
161            )),
162            partition_count: registry.register(metric!(
163                name: "mz_sink_partition_count",
164                help: "The number of partitions this sink is publishing to.",
165                var_labels: ["sink_id"],
166            )),
167        }
168    }
169}
170
171/// Metrics reported by librdkafka
172pub(crate) struct KafkaSinkMetrics {
173    /// The current number of messages in producer queues.
174    pub rdkafka_msg_cnt: DeleteOnDropGauge<AtomicU64, Vec<String>>,
175    /// The current total size of messages in producer queues.
176    pub rdkafka_msg_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
177    /// The total number of messages transmitted (produced) to brokers.
178    pub rdkafka_txmsgs: DeleteOnDropGauge<AtomicI64, Vec<String>>,
179    /// The total number of bytes transmitted (produced) to brokers.
180    pub rdkafka_txmsg_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
181    /// The total number of requests sent to brokers.
182    pub rdkafka_tx: DeleteOnDropGauge<AtomicI64, Vec<String>>,
183    /// The total number of bytes transmitted to brokers.
184    pub rdkafka_tx_bytes: DeleteOnDropGauge<AtomicI64, Vec<String>>,
185    /// The number of requests awaiting transmission across all brokers.
186    pub rdkafka_outbuf_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
187    /// The number of messages awaiting transmission across all brokers.
188    pub rdkafka_outbuf_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
189    /// The number of requests in-flight across all brokers that are awaiting a response.
190    pub rdkafka_waitresp_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
191    /// The number of messages in-flight across all brokers that are awaiting a response.
192    pub rdkafka_waitresp_msg_cnt: DeleteOnDropGauge<AtomicI64, Vec<String>>,
193    /// The total number of transmission errors across all brokers.
194    pub rdkafka_txerrs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
195    /// The total number of request retries across all brokers.
196    pub rdkafka_txretries: DeleteOnDropGauge<AtomicU64, Vec<String>>,
197    /// The total number of requests that timed out across all brokers.
198    pub rdkafka_req_timeouts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
199    /// The number of connection attempts, including successful and failed attempts, and name
200    /// resolution failures across all brokers.
201    pub rdkafka_connects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
202    /// The number of disconnections, whether triggered by the broker, the network, the load
203    /// balancer, or something else across all brokers.
204    pub rdkafka_disconnects: DeleteOnDropGauge<AtomicI64, Vec<String>>,
205    /// The number of outstanding progress records that need to be read before the sink can resume.
206    pub outstanding_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
207    /// The number of progress records consumed while resuming the sink.
208    pub consumed_progress_records: DeleteOnDropGauge<AtomicU64, Vec<String>>,
209    /// The number of partitions this sink is publishing to.
210    pub partition_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
211}
212
213impl KafkaSinkMetrics {
214    /// Initializes source metrics for a given (source_id, worker_id)
215    pub fn new(defs: &KafkaSinkMetricDefs, sink_id: GlobalId) -> Self {
216        let labels = &[sink_id.to_string()];
217        Self {
218            rdkafka_msg_cnt: defs
219                .rdkafka_msg_cnt
220                .get_delete_on_drop_metric(labels.to_vec()),
221            rdkafka_msg_size: defs
222                .rdkafka_msg_size
223                .get_delete_on_drop_metric(labels.to_vec()),
224            rdkafka_txmsgs: defs
225                .rdkafka_txmsgs
226                .get_delete_on_drop_metric(labels.to_vec()),
227            rdkafka_txmsg_bytes: defs
228                .rdkafka_txmsg_bytes
229                .get_delete_on_drop_metric(labels.to_vec()),
230            rdkafka_tx: defs.rdkafka_tx.get_delete_on_drop_metric(labels.to_vec()),
231            rdkafka_tx_bytes: defs
232                .rdkafka_tx_bytes
233                .get_delete_on_drop_metric(labels.to_vec()),
234            rdkafka_outbuf_cnt: defs
235                .rdkafka_outbuf_cnt
236                .get_delete_on_drop_metric(labels.to_vec()),
237            rdkafka_outbuf_msg_cnt: defs
238                .rdkafka_outbuf_msg_cnt
239                .get_delete_on_drop_metric(labels.to_vec()),
240            rdkafka_waitresp_cnt: defs
241                .rdkafka_waitresp_cnt
242                .get_delete_on_drop_metric(labels.to_vec()),
243            rdkafka_waitresp_msg_cnt: defs
244                .rdkafka_waitresp_msg_cnt
245                .get_delete_on_drop_metric(labels.to_vec()),
246            rdkafka_txerrs: defs
247                .rdkafka_txerrs
248                .get_delete_on_drop_metric(labels.to_vec()),
249            rdkafka_txretries: defs
250                .rdkafka_txretries
251                .get_delete_on_drop_metric(labels.to_vec()),
252            rdkafka_req_timeouts: defs
253                .rdkafka_req_timeouts
254                .get_delete_on_drop_metric(labels.to_vec()),
255            rdkafka_connects: defs
256                .rdkafka_connects
257                .get_delete_on_drop_metric(labels.to_vec()),
258            rdkafka_disconnects: defs
259                .rdkafka_disconnects
260                .get_delete_on_drop_metric(labels.to_vec()),
261            outstanding_progress_records: defs
262                .outstanding_progress_records
263                .get_delete_on_drop_metric(labels.to_vec()),
264            consumed_progress_records: defs
265                .consumed_progress_records
266                .get_delete_on_drop_metric(labels.to_vec()),
267            partition_count: defs
268                .partition_count
269                .get_delete_on_drop_metric(labels.to_vec()),
270        }
271    }
272}