1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;

use mz_ore::iter::IteratorExt;
use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt};
use mz_repr::GlobalId;
use prometheus::core::AtomicI64;
use tracing::debug;

use crate::source::metrics::SourceBaseMetrics;
pub(super) struct KafkaPartitionMetrics {
    labels: Vec<String>,
    base_metrics: SourceBaseMetrics,
    partition_offset_map: BTreeMap<i32, DeleteOnDropGauge<'static, AtomicI64, Vec<String>>>,
}

impl KafkaPartitionMetrics {
    pub fn new(
        base_metrics: SourceBaseMetrics,
        ids: Vec<i32>,
        topic: String,
        source_id: GlobalId,
    ) -> Self {
        let metrics = &base_metrics.partition_specific;
        Self {
            partition_offset_map: BTreeMap::from_iter(ids.iter().map(|id| {
                let labels = &[topic.clone(), source_id.to_string(), format!("{}", id)];
                (
                    *id,
                    metrics
                        .partition_offset_max
                        .get_delete_on_drop_gauge(labels.to_vec()),
                )
            })),
            labels: vec![topic.clone(), source_id.to_string()],
            base_metrics,
        }
    }

    pub fn set_offset_max(&mut self, id: i32, offset: i64) {
        // Valid partition ids start at 0, librdkafka uses -1 as a sentinel for unassigned partitions
        if id < 0 {
            return;
        }
        // This offset value is another librdkafka sentinel indicating it got an invalid high watermark from the broker
        if offset == -1001 {
            // TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling
            debug!("Got invalid high watermark for partition {}", id);
            return;
        }
        self.partition_offset_map
            .entry(id)
            .or_insert_with_key(|id| {
                self.base_metrics
                    .partition_specific
                    .partition_offset_max
                    .get_delete_on_drop_gauge(
                        self.labels
                            .iter()
                            .cloned()
                            .chain_one(format!("{}", id))
                            .collect(),
                    )
            })
            .set(offset);
    }
}