1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Metrics for Kafka consumption.
1112use std::collections::BTreeMap;
1314use mz_ore::iter::IteratorExt;
15use mz_ore::metric;
16use mz_ore::metrics::{DeleteOnDropGauge, IntGaugeVec, MetricsRegistry};
17use mz_repr::GlobalId;
18use prometheus::core::AtomicI64;
19use tracing::debug;
2021/// Definitions for kafka-specific per-partition metrics.
22#[derive(Clone, Debug)]
23pub(crate) struct KafkaSourceMetricDefs {
24pub(crate) partition_offset_max: IntGaugeVec,
25}
2627impl KafkaSourceMetricDefs {
28pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
29Self {
30 partition_offset_max: registry.register(metric!(
31 name: "mz_kafka_partition_offset_max",
32 help: "High watermark offset on broker for partition",
33 var_labels: ["topic", "source_id", "partition_id"],
34 )),
35 }
36 }
37}
3839/// Kafka-specific per-partition metrics.
40pub(crate) struct KafkaSourceMetrics {
41 labels: Vec<String>,
42 defs: KafkaSourceMetricDefs,
43 partition_offset_map: BTreeMap<i32, DeleteOnDropGauge<AtomicI64, Vec<String>>>,
44}
4546impl KafkaSourceMetrics {
47/// Create a `KafkaSourceMetrics` from the `KafkaSourceMetricDefs`.
48pub(crate) fn new(
49 defs: &KafkaSourceMetricDefs,
50 ids: Vec<i32>,
51 topic: String,
52 source_id: GlobalId,
53 ) -> Self {
54Self {
55 partition_offset_map: BTreeMap::from_iter(ids.iter().map(|id| {
56let labels = &[topic.clone(), source_id.to_string(), format!("{}", id)];
57 (
58*id,
59 defs.partition_offset_max
60 .get_delete_on_drop_metric(labels.to_vec()),
61 )
62 })),
63 labels: vec![topic.clone(), source_id.to_string()],
64 defs: defs.clone(),
65 }
66 }
6768pub(crate) fn set_offset_max(&mut self, id: i32, offset: i64) {
69// Valid partition ids start at 0, librdkafka uses -1 as a sentinel for unassigned partitions
70if id < 0 {
71return;
72 }
73// This offset value is another librdkafka sentinel indicating it got an invalid high watermark from the broker
74if offset == -1001 {
75// TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling
76debug!("Got invalid high watermark for partition {}", id);
77return;
78 }
79self.partition_offset_map
80 .entry(id)
81 .or_insert_with_key(|id| {
82self.defs.partition_offset_max.get_delete_on_drop_metric(
83self.labels
84 .iter()
85 .cloned()
86 .chain_one(format!("{}", id))
87 .collect(),
88 )
89 })
90 .set(offset);
91 }
92}