use mz_ore::metric;
use mz_ore::metrics::{
DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec, IntGaugeVec,
MetricsRegistry, UIntGaugeVec,
};
use mz_repr::GlobalId;
use prometheus::core::{AtomicI64, AtomicU64};
pub mod kafka;
pub mod mysql;
pub mod postgres;
#[derive(Clone, Debug)]
pub(crate) struct GeneralSourceMetricDefs {
pub(crate) capability: UIntGaugeVec,
pub(crate) resume_upper: IntGaugeVec,
pub(crate) commit_upper_ready_times: UIntGaugeVec,
pub(crate) commit_upper_accepted_times: UIntGaugeVec,
pub(crate) offset_commit_failures: IntCounterVec,
pub(crate) progress: IntGaugeVec,
pub(crate) row_inserts: IntCounterVec,
pub(crate) row_retractions: IntCounterVec,
pub(crate) error_inserts: IntCounterVec,
pub(crate) error_retractions: IntCounterVec,
pub(crate) persist_sink_processed_batches: IntCounterVec,
}
impl GeneralSourceMetricDefs {
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
Self {
capability: registry.register(metric!(
name: "mz_capability",
help: "The current capability for this dataflow.",
var_labels: ["topic", "source_id", "worker_id"],
)),
resume_upper: registry.register(metric!(
name: "mz_resume_upper",
help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
var_labels: ["source_id"],
)),
commit_upper_ready_times: registry.register(metric!(
name: "mz_source_commit_upper_ready_times",
help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
var_labels: ["source_id", "worker_id"],
)),
commit_upper_accepted_times: registry.register(metric!(
name: "mz_source_commit_upper_accepted_times",
help: "The number of accepted remap bindings that are held in the reclock commit upper operator.",
var_labels: ["source_id", "worker_id"],
)),
offset_commit_failures: registry.register(metric!(
name: "mz_source_offset_commit_failures",
help: "A counter representing how many times we have failed to commit offsets for a source",
var_labels: ["source_id"],
)),
progress: registry.register(metric!(
name: "mz_source_progress",
help: "A timestamp gauge representing forward progess in the data shard",
var_labels: ["source_id", "output", "shard", "worker_id"],
)),
row_inserts: registry.register(metric!(
name: "mz_source_row_inserts",
help: "A counter representing the actual number of rows being inserted to the data shard",
var_labels: ["source_id", "output", "shard", "worker_id"],
)),
row_retractions: registry.register(metric!(
name: "mz_source_row_retractions",
help: "A counter representing the actual number of rows being retracted from the data shard",
var_labels: ["source_id", "output", "shard", "worker_id"],
)),
error_inserts: registry.register(metric!(
name: "mz_source_error_inserts",
help: "A counter representing the actual number of errors being inserted to the data shard",
var_labels: ["source_id", "output", "shard", "worker_id"],
)),
error_retractions: registry.register(metric!(
name: "mz_source_error_retractions",
help: "A counter representing the actual number of errors being retracted from the data shard",
var_labels: ["source_id", "output", "shard", "worker_id"],
)),
persist_sink_processed_batches: registry.register(metric!(
name: "mz_source_processed_batches",
help: "A counter representing the number of persist sink batches with actual data \
we have successfully processed.",
var_labels: ["source_id", "output", "shard", "worker_id"],
)),
}
}
}
pub(crate) struct SourceMetrics {
pub(crate) capability: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) resume_upper: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
pub(crate) commit_upper_ready_times: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) commit_upper_accepted_times: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
}
impl SourceMetrics {
pub(crate) fn new(
defs: &GeneralSourceMetricDefs,
source_name: &str,
source_id: GlobalId,
worker_id: usize,
) -> SourceMetrics {
let labels = &[
source_name.to_string(),
source_id.to_string(),
worker_id.to_string(),
];
SourceMetrics {
capability: defs.capability.get_delete_on_drop_metric(labels.to_vec()),
resume_upper: defs
.resume_upper
.get_delete_on_drop_metric(vec![source_id.to_string()]),
commit_upper_ready_times: defs
.commit_upper_ready_times
.get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
commit_upper_accepted_times: defs
.commit_upper_accepted_times
.get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
}
}
}
pub(crate) struct SourcePersistSinkMetrics {
pub(crate) progress: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
pub(crate) row_inserts: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) row_retractions: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) error_inserts: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) error_retractions: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) processed_batches: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
}
impl SourcePersistSinkMetrics {
pub(crate) fn new(
defs: &GeneralSourceMetricDefs,
_source_id: GlobalId,
parent_source_id: GlobalId,
worker_id: usize,
shard_id: &mz_persist_client::ShardId,
output_index: usize,
) -> SourcePersistSinkMetrics {
let shard = shard_id.to_string();
SourcePersistSinkMetrics {
progress: defs.progress.get_delete_on_drop_metric(vec![
parent_source_id.to_string(),
output_index.to_string(),
shard.clone(),
worker_id.to_string(),
]),
row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![
parent_source_id.to_string(),
output_index.to_string(),
shard.clone(),
worker_id.to_string(),
]),
row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![
parent_source_id.to_string(),
output_index.to_string(),
shard.clone(),
worker_id.to_string(),
]),
error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![
parent_source_id.to_string(),
output_index.to_string(),
shard.clone(),
worker_id.to_string(),
]),
error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![
parent_source_id.to_string(),
output_index.to_string(),
shard.clone(),
worker_id.to_string(),
]),
processed_batches: defs
.persist_sink_processed_batches
.get_delete_on_drop_metric(vec![
parent_source_id.to_string(),
output_index.to_string(),
shard,
worker_id.to_string(),
]),
}
}
}
pub(crate) struct OffsetCommitMetrics {
pub(crate) offset_commit_failures: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
}
impl OffsetCommitMetrics {
pub(crate) fn new(defs: &GeneralSourceMetricDefs, source_id: GlobalId) -> OffsetCommitMetrics {
OffsetCommitMetrics {
offset_commit_failures: defs
.offset_commit_failures
.get_delete_on_drop_metric(vec![source_id.to_string()]),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct SourceMetricDefs {
pub(crate) source_defs: GeneralSourceMetricDefs,
pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
pub(crate) bytes_read: IntCounter,
}
impl SourceMetricDefs {
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
Self {
source_defs: GeneralSourceMetricDefs::register_with(registry),
postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
bytes_read: registry.register(metric!(
name: "mz_bytes_read_total",
help: "Count of bytes read from sources",
)),
}
}
}