1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
// Copyright Materialize, Inc. and contributors. All rights reserved.
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Metrics for Kafka consumption.
use std::collections::BTreeMap;
use mz_ore::iter::IteratorExt;
use mz_ore::metric;
use mz_ore::metrics::{DeleteOnDropGauge, IntGaugeVec, MetricsRegistry};
use mz_repr::GlobalId;
use prometheus::core::AtomicI64;
use tracing::debug;
/// Definitions for kafka-specific per-partition metrics.
#[derive(Clone, Debug)]
pub(crate) struct KafkaSourceMetricDefs {
pub(crate) partition_offset_max: IntGaugeVec,
impl KafkaSourceMetricDefs {
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
Self {
partition_offset_max: registry.register(metric!(
name: "mz_kafka_partition_offset_max",
help: "High watermark offset on broker for partition",
var_labels: ["topic", "source_id", "partition_id"],
/// Kafka-specific per-partition metrics.
pub(crate) struct KafkaSourceMetrics {
labels: Vec<String>,
defs: KafkaSourceMetricDefs,
partition_offset_map: BTreeMap<i32, DeleteOnDropGauge<AtomicI64, Vec<String>>>,
impl KafkaSourceMetrics {
/// Create a `KafkaSourceMetrics` from the `KafkaSourceMetricDefs`.
pub(crate) fn new(
defs: &KafkaSourceMetricDefs,
ids: Vec<i32>,
topic: String,
source_id: GlobalId,
) -> Self {
Self {
partition_offset_map: BTreeMap::from_iter(ids.iter().map(|id| {
let labels = &[topic.clone(), source_id.to_string(), format!("{}", id)];
labels: vec![topic.clone(), source_id.to_string()],
defs: defs.clone(),
pub(crate) fn set_offset_max(&mut self, id: i32, offset: i64) {
// Valid partition ids start at 0, librdkafka uses -1 as a sentinel for unassigned partitions
if id < 0 {
// This offset value is another librdkafka sentinel indicating it got an invalid high watermark from the broker
if offset == -1001 {
// TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling
debug!("Got invalid high watermark for partition {}", id);
.or_insert_with_key(|id| {
.chain_one(format!("{}", id))