1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::collections::BTreeMap;
use mz_ore::iter::IteratorExt;
use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt};
use mz_repr::GlobalId;
use prometheus::core::AtomicI64;
use tracing::debug;
use crate::source::metrics::SourceBaseMetrics;
pub(super) struct KafkaPartitionMetrics {
labels: Vec<String>,
base_metrics: SourceBaseMetrics,
partition_offset_map: BTreeMap<i32, DeleteOnDropGauge<'static, AtomicI64, Vec<String>>>,
}
impl KafkaPartitionMetrics {
pub fn new(
base_metrics: SourceBaseMetrics,
ids: Vec<i32>,
topic: String,
source_id: GlobalId,
) -> Self {
let metrics = &base_metrics.partition_specific;
Self {
partition_offset_map: BTreeMap::from_iter(ids.iter().map(|id| {
let labels = &[topic.clone(), source_id.to_string(), format!("{}", id)];
(
*id,
metrics
.partition_offset_max
.get_delete_on_drop_gauge(labels.to_vec()),
)
})),
labels: vec![topic.clone(), source_id.to_string()],
base_metrics,
}
}
pub fn set_offset_max(&mut self, id: i32, offset: i64) {
// Valid partition ids start at 0, librdkafka uses -1 as a sentinel for unassigned partitions
if id < 0 {
return;
}
// This offset value is another librdkafka sentinel indicating it got an invalid high watermark from the broker
if offset == -1001 {
// TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling
debug!("Got invalid high watermark for partition {}", id);
return;
}
self.partition_offset_map
.entry(id)
.or_insert_with_key(|id| {
self.base_metrics
.partition_specific
.partition_offset_max
.get_delete_on_drop_gauge(
self.labels
.iter()
.cloned()
.chain_one(format!("{}", id))
.collect(),
)
})
.set(offset);
}
}