mz_storage/metrics/source/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for Postgres.
11
12use mz_ore::metric;
13use mz_ore::metrics::{
14    DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, UIntGaugeVec,
15};
16use mz_repr::GlobalId;
17use prometheus::core::{AtomicF64, AtomicU64};
18use std::sync::Arc;
19use std::sync::Mutex;
20
21/// Definitions for Postgres source metrics.
22#[derive(Clone, Debug)]
23pub(crate) struct PgSourceMetricDefs {
24    pub(crate) total_messages: IntCounterVec,
25    pub(crate) transactions: IntCounterVec,
26    pub(crate) ignored_messages: IntCounterVec,
27    pub(crate) insert_messages: IntCounterVec,
28    pub(crate) update_messages: IntCounterVec,
29    pub(crate) delete_messages: IntCounterVec,
30    pub(crate) tables_in_publication: UIntGaugeVec,
31    pub(crate) wal_lsn: UIntGaugeVec,
32    pub(crate) table_count_latency: GaugeVec,
33}
34
35impl PgSourceMetricDefs {
36    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
37        Self {
38            total_messages: registry.register(metric!(
39                name: "mz_postgres_per_source_messages_total",
40                help: "The total number of replication messages for this source, not expected to be the sum of the other values.",
41                var_labels: ["source_id"],
42            )),
43            transactions: registry.register(metric!(
44                name: "mz_postgres_per_source_transactions_total",
45                help: "The number of committed transactions for all tables in this source",
46                var_labels: ["source_id"],
47            )),
48            ignored_messages: registry.register(metric!(
49                name: "mz_postgres_per_source_ignored_messages",
50                help: "The number of messages ignored because of an irrelevant type or relation_id",
51                var_labels: ["source_id"],
52            )),
53            insert_messages: registry.register(metric!(
54                name: "mz_postgres_per_source_inserts",
55                help: "The number of inserts for all tables in this source",
56                var_labels: ["source_id"],
57            )),
58            update_messages: registry.register(metric!(
59                name: "mz_postgres_per_source_updates",
60                help: "The number of updates for all tables in this source",
61                var_labels: ["source_id"],
62            )),
63            delete_messages: registry.register(metric!(
64                name: "mz_postgres_per_source_deletes",
65                help: "The number of deletes for all tables in this source",
66                var_labels: ["source_id"],
67            )),
68            tables_in_publication: registry.register(metric!(
69                name: "mz_postgres_per_source_tables_count",
70                help: "The number of upstream tables for this source",
71                var_labels: ["source_id"],
72            )),
73            wal_lsn: registry.register(metric!(
74                name: "mz_postgres_per_source_wal_lsn",
75                help: "LSN of the latest transaction committed for this source, see Postgres Replication docs for more details on LSN",
76                var_labels: ["source_id"],
77            )),
78            table_count_latency: registry.register(metric!(
79                name: "mz_postgres_snapshot_count_latency",
80                help: "The wall time used to obtain snapshot sizes.",
81                var_labels: ["source_id", "table_name", "strict"],
82            )),
83        }
84    }
85}
86
87#[derive(Clone)]
88pub(crate) struct PgSnapshotMetrics {
89    source_id: GlobalId,
90    // This has to be shared between tokio tasks and the replication operator, as the collection
91    // of these metrics happens once in those tasks, which do not live long enough to keep them
92    // alive.
93    gauges: Arc<Mutex<Vec<DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
94    defs: PgSourceMetricDefs,
95}
96
97impl PgSnapshotMetrics {
98    pub(crate) fn record_table_count_latency(
99        &self,
100        table_name: String,
101        latency: f64,
102        strict: bool,
103    ) {
104        let latency_gauge = self
105            .defs
106            .table_count_latency
107            .get_delete_on_drop_metric(vec![
108                self.source_id.to_string(),
109                table_name,
110                strict.to_string(),
111            ]);
112        latency_gauge.set(latency);
113        self.gauges.lock().expect("poisoned").push(latency_gauge)
114    }
115}
116
117/// Metrics for Postgres sources.
118pub(crate) struct PgSourceMetrics {
119    pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
120    pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
121    pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
122    pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
123    pub(crate) total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
124    pub(crate) transactions: DeleteOnDropCounter<AtomicU64, Vec<String>>,
125    pub(crate) tables: DeleteOnDropGauge<AtomicU64, Vec<String>>,
126    pub(crate) lsn: DeleteOnDropGauge<AtomicU64, Vec<String>>,
127
128    pub(crate) snapshot_metrics: PgSnapshotMetrics,
129}
130
131impl PgSourceMetrics {
132    /// Create a `PgSourceMetrics` from the `PgSourceMetricDefs`.
133    pub(crate) fn new(defs: &PgSourceMetricDefs, source_id: GlobalId) -> Self {
134        let labels = &[source_id.to_string()];
135        Self {
136            inserts: defs
137                .insert_messages
138                .get_delete_on_drop_metric(labels.to_vec()),
139            updates: defs
140                .update_messages
141                .get_delete_on_drop_metric(labels.to_vec()),
142            deletes: defs
143                .delete_messages
144                .get_delete_on_drop_metric(labels.to_vec()),
145            ignored: defs
146                .ignored_messages
147                .get_delete_on_drop_metric(labels.to_vec()),
148            total: defs
149                .total_messages
150                .get_delete_on_drop_metric(labels.to_vec()),
151            transactions: defs.transactions.get_delete_on_drop_metric(labels.to_vec()),
152            tables: defs
153                .tables_in_publication
154                .get_delete_on_drop_metric(labels.to_vec()),
155            lsn: defs.wal_lsn.get_delete_on_drop_metric(labels.to_vec()),
156            snapshot_metrics: PgSnapshotMetrics {
157                source_id,
158                gauges: Default::default(),
159                defs: defs.clone(),
160            },
161        }
162    }
163}