mz_storage/metrics/source/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for Kafka consumption.
11
12use std::collections::BTreeMap;
13
14use mz_ore::iter::IteratorExt;
15use mz_ore::metric;
16use mz_ore::metrics::{DeleteOnDropGauge, IntGaugeVec, MetricsRegistry};
17use mz_repr::GlobalId;
18use prometheus::core::AtomicI64;
19use tracing::debug;
20
21/// Definitions for kafka-specific per-partition metrics.
22#[derive(Clone, Debug)]
23pub(crate) struct KafkaSourceMetricDefs {
24    pub(crate) partition_offset_max: IntGaugeVec,
25}
26
27impl KafkaSourceMetricDefs {
28    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
29        Self {
30            partition_offset_max: registry.register(metric!(
31                name: "mz_kafka_partition_offset_max",
32                help: "High watermark offset on broker for partition",
33                var_labels: ["topic", "source_id", "partition_id"],
34            )),
35        }
36    }
37}
38
39/// Kafka-specific per-partition metrics.
40pub(crate) struct KafkaSourceMetrics {
41    labels: Vec<String>,
42    defs: KafkaSourceMetricDefs,
43    partition_offset_map: BTreeMap<i32, DeleteOnDropGauge<AtomicI64, Vec<String>>>,
44}
45
46impl KafkaSourceMetrics {
47    /// Create a `KafkaSourceMetrics` from the `KafkaSourceMetricDefs`.
48    pub(crate) fn new(
49        defs: &KafkaSourceMetricDefs,
50        ids: Vec<i32>,
51        topic: String,
52        source_id: GlobalId,
53    ) -> Self {
54        Self {
55            partition_offset_map: BTreeMap::from_iter(ids.iter().map(|id| {
56                let labels = &[topic.clone(), source_id.to_string(), format!("{}", id)];
57                (
58                    *id,
59                    defs.partition_offset_max
60                        .get_delete_on_drop_metric(labels.to_vec()),
61                )
62            })),
63            labels: vec![topic.clone(), source_id.to_string()],
64            defs: defs.clone(),
65        }
66    }
67
68    pub(crate) fn set_offset_max(&mut self, id: i32, offset: i64) {
69        // Valid partition ids start at 0, librdkafka uses -1 as a sentinel for unassigned partitions
70        if id < 0 {
71            return;
72        }
73        // This offset value is another librdkafka sentinel indicating it got an invalid high watermark from the broker
74        if offset == -1001 {
75            // TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling
76            debug!("Got invalid high watermark for partition {}", id);
77            return;
78        }
79        self.partition_offset_map
80            .entry(id)
81            .or_insert_with_key(|id| {
82                self.defs.partition_offset_max.get_delete_on_drop_metric(
83                    self.labels
84                        .iter()
85                        .cloned()
86                        .chain_one(format!("{}", id))
87                        .collect(),
88                )
89            })
90            .set(offset);
91    }
92}