mz_storage/metrics/source/
kafka.rs1use std::collections::BTreeMap;
13
14use mz_ore::iter::IteratorExt;
15use mz_ore::metric;
16use mz_ore::metrics::{DeleteOnDropGauge, IntGaugeVec, MetricsRegistry};
17use mz_repr::GlobalId;
18use prometheus::core::AtomicI64;
19use tracing::debug;
20
21#[derive(Clone, Debug)]
23pub(crate) struct KafkaSourceMetricDefs {
24 pub(crate) partition_offset_max: IntGaugeVec,
25}
26
27impl KafkaSourceMetricDefs {
28 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
29 Self {
30 partition_offset_max: registry.register(metric!(
31 name: "mz_kafka_partition_offset_max",
32 help: "High watermark offset on broker for partition",
33 var_labels: ["topic", "source_id", "partition_id"],
34 )),
35 }
36 }
37}
38
39pub(crate) struct KafkaSourceMetrics {
41 labels: Vec<String>,
42 defs: KafkaSourceMetricDefs,
43 partition_offset_map: BTreeMap<i32, DeleteOnDropGauge<AtomicI64, Vec<String>>>,
44}
45
46impl KafkaSourceMetrics {
47 pub(crate) fn new(
49 defs: &KafkaSourceMetricDefs,
50 ids: Vec<i32>,
51 topic: String,
52 source_id: GlobalId,
53 ) -> Self {
54 Self {
55 partition_offset_map: BTreeMap::from_iter(ids.iter().map(|id| {
56 let labels = &[topic.clone(), source_id.to_string(), format!("{}", id)];
57 (
58 *id,
59 defs.partition_offset_max
60 .get_delete_on_drop_metric(labels.to_vec()),
61 )
62 })),
63 labels: vec![topic.clone(), source_id.to_string()],
64 defs: defs.clone(),
65 }
66 }
67
68 pub(crate) fn set_offset_max(&mut self, id: i32, offset: i64) {
69 if id < 0 {
71 return;
72 }
73 if offset == -1001 {
75 debug!("Got invalid high watermark for partition {}", id);
77 return;
78 }
79 self.partition_offset_map
80 .entry(id)
81 .or_insert_with_key(|id| {
82 self.defs.partition_offset_max.get_delete_on_drop_metric(
83 self.labels
84 .iter()
85 .cloned()
86 .chain_one(format!("{}", id))
87 .collect(),
88 )
89 })
90 .set(offset);
91 }
92}