mz_storage/metrics/source/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for Postgres.
11
12use mz_ore::metric;
13use mz_ore::metrics::{
14    DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicF64, AtomicU64};
18use std::sync::Arc;
19use std::sync::Mutex;
20
21/// Definitions for Postgres source metrics.
22#[derive(Clone, Debug)]
23pub(crate) struct PgSourceMetricDefs {
24    pub(crate) total_messages: IntCounterVec,
25    pub(crate) transactions: IntCounterVec,
26    pub(crate) ignored_messages: IntCounterVec,
27    pub(crate) insert_messages: IntCounterVec,
28    pub(crate) update_messages: IntCounterVec,
29    pub(crate) delete_messages: IntCounterVec,
30    pub(crate) tables_in_publication: UIntGaugeVec,
31    pub(crate) wal_lsn: UIntGaugeVec,
32    pub(crate) table_count_latency: GaugeVec,
33}
34
35impl PgSourceMetricDefs {
36    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
37        Self {
38            total_messages: registry.register(metric!(
39                name: "mz_postgres_per_source_messages_total",
40                help: "The total number of replication messages for this source, not expected to be the sum of the other values.",
41                var_labels: ["source_id"],
42            )),
43            transactions: registry.register(metric!(
44                name: "mz_postgres_per_source_transactions_total",
45                help: "The number of committed transactions for all tables in this source",
46                var_labels: ["source_id"],
47            )),
48            ignored_messages: registry.register(metric!(
49                name: "mz_postgres_per_source_ignored_messages",
50                help: "The number of messages ignored because of an irrelevant type or relation_id",
51                var_labels: ["source_id"],
52            )),
53            insert_messages: registry.register(metric!(
54                name: "mz_postgres_per_source_inserts",
55                help: "The number of inserts for all tables in this source",
56                var_labels: ["source_id"],
57            )),
58            update_messages: registry.register(metric!(
59                name: "mz_postgres_per_source_updates",
60                help: "The number of updates for all tables in this source",
61                var_labels: ["source_id"],
62            )),
63            delete_messages: registry.register(metric!(
64                name: "mz_postgres_per_source_deletes",
65                help: "The number of deletes for all tables in this source",
66                var_labels: ["source_id"],
67            )),
68            tables_in_publication: registry.register(metric!(
69                name: "mz_postgres_per_source_tables_count",
70                help: "The number of upstream tables for this source",
71                var_labels: ["source_id"],
72            )),
73            wal_lsn: registry.register(metric!(
74                name: "mz_postgres_per_source_wal_lsn",
75                help: "LSN of the latest transaction committed for this source, see Postgres Replication docs for more details on LSN",
76                var_labels: ["source_id"],
77            )),
78            table_count_latency: registry.register(metric!(
79                name: "mz_postgres_snapshot_count_latency",
80                help: "The wall time used to obtain snapshot sizes.",
81                var_labels: ["source_id", "table_name"],
82            )),
83        }
84    }
85}
86
87#[derive(Clone)]
88pub(crate) struct PgSnapshotMetrics {
89    source_id: GlobalId,
90    // This has to be shared between tokio tasks and the replication operator, as the collection
91    // of these metrics happens once in those tasks, which do not live long enough to keep them
92    // alive.
93    gauges: Arc<Mutex<Vec<DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
94    defs: PgSourceMetricDefs,
95}
96
97impl PgSnapshotMetrics {
98    pub(crate) fn record_table_count_latency(&self, table_name: String, latency: f64) {
99        let latency_gauge = self
100            .defs
101            .table_count_latency
102            .get_delete_on_drop_metric(vec![self.source_id.to_string(), table_name]);
103        latency_gauge.set(latency);
104        self.gauges.lock().expect("poisoned").push(latency_gauge)
105    }
106}
107
108/// Metrics for Postgres sources.
109pub(crate) struct PgSourceMetrics {
110    pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
111    pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112    pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
113    pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114    pub(crate) total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
115    pub(crate) transactions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
116    pub(crate) tables: DeleteOnDropGauge<AtomicU64, Vec<String>>,
117    pub(crate) lsn: DeleteOnDropGauge<AtomicU64, Vec<String>>,
118
119    pub(crate) snapshot_metrics: PgSnapshotMetrics,
120}
121
122impl PgSourceMetrics {
123    /// Create a `PgSourceMetrics` from the `PgSourceMetricDefs`.
124    pub(crate) fn new(defs: &PgSourceMetricDefs, source_id: GlobalId) -> Self {
125        let labels = &[source_id.to_string()];
126        Self {
127            inserts: defs
128                .insert_messages
129                .get_delete_on_drop_metric(labels.to_vec()),
130            updates: defs
131                .update_messages
132                .get_delete_on_drop_metric(labels.to_vec()),
133            deletes: defs
134                .delete_messages
135                .get_delete_on_drop_metric(labels.to_vec()),
136            ignored: defs
137                .ignored_messages
138                .get_delete_on_drop_metric(labels.to_vec()),
139            total: defs
140                .total_messages
141                .get_delete_on_drop_metric(labels.to_vec()),
142            transactions: defs.transactions.get_delete_on_drop_metric(labels.to_vec()),
143            tables: defs
144                .tables_in_publication
145                .get_delete_on_drop_metric(labels.to_vec()),
146            lsn: defs.wal_lsn.get_delete_on_drop_metric(labels.to_vec()),
147            snapshot_metrics: PgSnapshotMetrics {
148                source_id,
149                gauges: Default::default(),
150                defs: defs.clone(),
151            },
152        }
153    }
154}