mz_storage/metrics/source/
postgres.rs1use mz_ore::metric;
13use mz_ore::metrics::{
14 DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicF64, AtomicU64};
18use std::sync::Arc;
19use std::sync::Mutex;
20
21#[derive(Clone, Debug)]
23pub(crate) struct PgSourceMetricDefs {
24 pub(crate) total_messages: IntCounterVec,
25 pub(crate) transactions: IntCounterVec,
26 pub(crate) ignored_messages: IntCounterVec,
27 pub(crate) insert_messages: IntCounterVec,
28 pub(crate) update_messages: IntCounterVec,
29 pub(crate) delete_messages: IntCounterVec,
30 pub(crate) tables_in_publication: UIntGaugeVec,
31 pub(crate) wal_lsn: UIntGaugeVec,
32 pub(crate) table_count_latency: GaugeVec,
33}
34
35impl PgSourceMetricDefs {
36 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
37 Self {
38 total_messages: registry.register(metric!(
39 name: "mz_postgres_per_source_messages_total",
40 help: "The total number of replication messages for this source, not expected to be the sum of the other values.",
41 var_labels: ["source_id"],
42 )),
43 transactions: registry.register(metric!(
44 name: "mz_postgres_per_source_transactions_total",
45 help: "The number of committed transactions for all tables in this source",
46 var_labels: ["source_id"],
47 )),
48 ignored_messages: registry.register(metric!(
49 name: "mz_postgres_per_source_ignored_messages",
50 help: "The number of messages ignored because of an irrelevant type or relation_id",
51 var_labels: ["source_id"],
52 )),
53 insert_messages: registry.register(metric!(
54 name: "mz_postgres_per_source_inserts",
55 help: "The number of inserts for all tables in this source",
56 var_labels: ["source_id"],
57 )),
58 update_messages: registry.register(metric!(
59 name: "mz_postgres_per_source_updates",
60 help: "The number of updates for all tables in this source",
61 var_labels: ["source_id"],
62 )),
63 delete_messages: registry.register(metric!(
64 name: "mz_postgres_per_source_deletes",
65 help: "The number of deletes for all tables in this source",
66 var_labels: ["source_id"],
67 )),
68 tables_in_publication: registry.register(metric!(
69 name: "mz_postgres_per_source_tables_count",
70 help: "The number of upstream tables for this source",
71 var_labels: ["source_id"],
72 )),
73 wal_lsn: registry.register(metric!(
74 name: "mz_postgres_per_source_wal_lsn",
75 help: "LSN of the latest transaction committed for this source, see Postgres Replication docs for more details on LSN",
76 var_labels: ["source_id"],
77 )),
78 table_count_latency: registry.register(metric!(
79 name: "mz_postgres_snapshot_count_latency",
80 help: "The wall time used to obtain snapshot sizes.",
81 var_labels: ["source_id", "table_name"],
82 )),
83 }
84 }
85}
86
87#[derive(Clone)]
88pub(crate) struct PgSnapshotMetrics {
89 source_id: GlobalId,
90 gauges: Arc<Mutex<Vec<DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
94 defs: PgSourceMetricDefs,
95}
96
97impl PgSnapshotMetrics {
98 pub(crate) fn record_table_count_latency(&self, table_name: String, latency: f64) {
99 let latency_gauge = self
100 .defs
101 .table_count_latency
102 .get_delete_on_drop_metric(vec![self.source_id.to_string(), table_name]);
103 latency_gauge.set(latency);
104 self.gauges.lock().expect("poisoned").push(latency_gauge)
105 }
106}
107
108pub(crate) struct PgSourceMetrics {
110 pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
111 pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112 pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
113 pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114 pub(crate) total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
115 pub(crate) transactions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
116 pub(crate) tables: DeleteOnDropGauge<AtomicU64, Vec<String>>,
117 pub(crate) lsn: DeleteOnDropGauge<AtomicU64, Vec<String>>,
118
119 pub(crate) snapshot_metrics: PgSnapshotMetrics,
120}
121
122impl PgSourceMetrics {
123 pub(crate) fn new(defs: &PgSourceMetricDefs, source_id: GlobalId) -> Self {
125 let labels = &[source_id.to_string()];
126 Self {
127 inserts: defs
128 .insert_messages
129 .get_delete_on_drop_metric(labels.to_vec()),
130 updates: defs
131 .update_messages
132 .get_delete_on_drop_metric(labels.to_vec()),
133 deletes: defs
134 .delete_messages
135 .get_delete_on_drop_metric(labels.to_vec()),
136 ignored: defs
137 .ignored_messages
138 .get_delete_on_drop_metric(labels.to_vec()),
139 total: defs
140 .total_messages
141 .get_delete_on_drop_metric(labels.to_vec()),
142 transactions: defs.transactions.get_delete_on_drop_metric(labels.to_vec()),
143 tables: defs
144 .tables_in_publication
145 .get_delete_on_drop_metric(labels.to_vec()),
146 lsn: defs.wal_lsn.get_delete_on_drop_metric(labels.to_vec()),
147 snapshot_metrics: PgSnapshotMetrics {
148 source_id,
149 gauges: Default::default(),
150 defs: defs.clone(),
151 },
152 }
153 }
154}