1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Metrics for Postgres.
1112use mz_ore::metric;
13use mz_ore::metrics::{
14 DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicF64, AtomicU64};
18use std::sync::Arc;
19use std::sync::Mutex;
2021/// Definitions for Postgres source metrics.
22#[derive(Clone, Debug)]
23pub(crate) struct PgSourceMetricDefs {
24pub(crate) total_messages: IntCounterVec,
25pub(crate) transactions: IntCounterVec,
26pub(crate) ignored_messages: IntCounterVec,
27pub(crate) insert_messages: IntCounterVec,
28pub(crate) update_messages: IntCounterVec,
29pub(crate) delete_messages: IntCounterVec,
30pub(crate) tables_in_publication: UIntGaugeVec,
31pub(crate) wal_lsn: UIntGaugeVec,
32pub(crate) table_count_latency: GaugeVec,
33}
3435impl PgSourceMetricDefs {
36pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
37Self {
38 total_messages: registry.register(metric!(
39 name: "mz_postgres_per_source_messages_total",
40 help: "The total number of replication messages for this source, not expected to be the sum of the other values.",
41 var_labels: ["source_id"],
42 )),
43 transactions: registry.register(metric!(
44 name: "mz_postgres_per_source_transactions_total",
45 help: "The number of committed transactions for all tables in this source",
46 var_labels: ["source_id"],
47 )),
48 ignored_messages: registry.register(metric!(
49 name: "mz_postgres_per_source_ignored_messages",
50 help: "The number of messages ignored because of an irrelevant type or relation_id",
51 var_labels: ["source_id"],
52 )),
53 insert_messages: registry.register(metric!(
54 name: "mz_postgres_per_source_inserts",
55 help: "The number of inserts for all tables in this source",
56 var_labels: ["source_id"],
57 )),
58 update_messages: registry.register(metric!(
59 name: "mz_postgres_per_source_updates",
60 help: "The number of updates for all tables in this source",
61 var_labels: ["source_id"],
62 )),
63 delete_messages: registry.register(metric!(
64 name: "mz_postgres_per_source_deletes",
65 help: "The number of deletes for all tables in this source",
66 var_labels: ["source_id"],
67 )),
68 tables_in_publication: registry.register(metric!(
69 name: "mz_postgres_per_source_tables_count",
70 help: "The number of upstream tables for this source",
71 var_labels: ["source_id"],
72 )),
73 wal_lsn: registry.register(metric!(
74 name: "mz_postgres_per_source_wal_lsn",
75 help: "LSN of the latest transaction committed for this source, see Postgres Replication docs for more details on LSN",
76 var_labels: ["source_id"],
77 )),
78 table_count_latency: registry.register(metric!(
79 name: "mz_postgres_snapshot_count_latency",
80 help: "The wall time used to obtain snapshot sizes.",
81 var_labels: ["source_id", "table_name", "strict"],
82 )),
83 }
84 }
85}
8687#[derive(Clone)]
88pub(crate) struct PgSnapshotMetrics {
89 source_id: GlobalId,
90// This has to be shared between tokio tasks and the replication operator, as the collection
91 // of these metrics happens once in those tasks, which do not live long enough to keep them
92 // alive.
93gauges: Arc<Mutex<Vec<DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
94 defs: PgSourceMetricDefs,
95}
9697impl PgSnapshotMetrics {
98pub(crate) fn record_table_count_latency(
99&self,
100 table_name: String,
101 latency: f64,
102 strict: bool,
103 ) {
104let latency_gauge = self
105.defs
106 .table_count_latency
107 .get_delete_on_drop_metric(vec![
108self.source_id.to_string(),
109 table_name,
110 strict.to_string(),
111 ]);
112 latency_gauge.set(latency);
113self.gauges.lock().expect("poisoned").push(latency_gauge)
114 }
115}
116117/// Metrics for Postgres sources.
118pub(crate) struct PgSourceMetrics {
119pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
120pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
121pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
122pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
123pub(crate) total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
124pub(crate) transactions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
125pub(crate) tables: DeleteOnDropGauge<AtomicU64, Vec<String>>,
126pub(crate) lsn: DeleteOnDropGauge<AtomicU64, Vec<String>>,
127128pub(crate) snapshot_metrics: PgSnapshotMetrics,
129}
130131impl PgSourceMetrics {
132/// Create a `PgSourceMetrics` from the `PgSourceMetricDefs`.
133pub(crate) fn new(defs: &PgSourceMetricDefs, source_id: GlobalId) -> Self {
134let labels = &[source_id.to_string()];
135Self {
136 inserts: defs
137 .insert_messages
138 .get_delete_on_drop_metric(labels.to_vec()),
139 updates: defs
140 .update_messages
141 .get_delete_on_drop_metric(labels.to_vec()),
142 deletes: defs
143 .delete_messages
144 .get_delete_on_drop_metric(labels.to_vec()),
145 ignored: defs
146 .ignored_messages
147 .get_delete_on_drop_metric(labels.to_vec()),
148 total: defs
149 .total_messages
150 .get_delete_on_drop_metric(labels.to_vec()),
151 transactions: defs.transactions.get_delete_on_drop_metric(labels.to_vec()),
152 tables: defs
153 .tables_in_publication
154 .get_delete_on_drop_metric(labels.to_vec()),
155 lsn: defs.wal_lsn.get_delete_on_drop_metric(labels.to_vec()),
156 snapshot_metrics: PgSnapshotMetrics {
157 source_id,
158 gauges: Default::default(),
159 defs: defs.clone(),
160 },
161 }
162 }
163}