mz_storage/metrics/source/
sql_server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for Postgres.
11
12use std::collections::BTreeMap;
13use std::rc::Rc;
14use std::sync::Mutex;
15
16use mz_ore::metric;
17use mz_ore::metrics::{
18    DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, IntGaugeVec, MetricsRegistry,
19    UIntGaugeVec,
20};
21use mz_repr::GlobalId;
22use prometheus::core::{AtomicF64, AtomicI64, AtomicU64};
23
24/// Definitions for Postgres source metrics.
25#[derive(Clone, Debug)]
26pub(crate) struct SqlServerSourceMetricDefs {
27    pub(crate) ignored_messages: IntCounterVec,
28    pub(crate) insert_messages: IntCounterVec,
29    pub(crate) update_messages: IntCounterVec,
30    pub(crate) delete_messages: IntCounterVec,
31    pub(crate) snapshot_table_count: UIntGaugeVec,
32    pub(crate) snapshot_table_size_latency: GaugeVec,
33    pub(crate) snapshot_table_lock: IntGaugeVec,
34}
35
36impl SqlServerSourceMetricDefs {
37    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
38        // Every metric must have a worker specific id associated with it. These are later wrapped
39        // in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
40        // would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
41        // workers may still be running, but the metrics registry will no longer record or report
42        // metrics for that `source_id`.
43        Self {
44            ignored_messages: registry.register(metric!(
45                name: "mz_sql_server_per_source_ignored_messages",
46                help: "The number of messages ignored because of an irrelevant type or relation_id",
47                var_labels: ["source_id", "worker_id"],
48            )),
49            insert_messages: registry.register(metric!(
50                name: "mz_sql_server_per_source_inserts",
51                help: "The number of inserts for all tables in this source",
52                var_labels: ["source_id", "worker_id"],
53            )),
54            update_messages: registry.register(metric!(
55                name: "mz_sql_server_per_source_updates",
56                help: "The number of updates for all tables in this source",
57                var_labels: ["source_id", "worker_id"],
58            )),
59            delete_messages: registry.register(metric!(
60                name: "mz_sql_server_per_source_deletes",
61                help: "The number of deletes for all tables in this source",
62                var_labels: ["source_id", "worker_id"],
63            )),
64            snapshot_table_count: registry.register(metric!(
65                name: "mz_sql_server_snapshot_table_count",
66                help: "The number of tables that SQL Server still needs to snapshot",
67                var_labels: ["source_id", "worker_id"],
68            )),
69            snapshot_table_size_latency: registry.register(metric!(
70                name: "mz_sql_server_snapshot_count_latency",
71                help: "The wall time used to obtain snapshot sizes.",
72                var_labels: ["source_id", "worker_id", "table_name"],
73            )),
74            snapshot_table_lock: registry.register(metric!(
75                name: "mz_sql_server_snapshot_table_lock",
76                help: "The upstream tables locked for snapshot.",
77                var_labels: ["source_id", "worker_id", "table_name"],
78            )),
79        }
80    }
81}
82#[derive(Clone)]
83/// Metrics for Postgres sources.
84pub(crate) struct SqlServerSourceMetrics {
85    // stored as String to avoid having to convert them repeatedly.
86    source_id: String,
87    worker_id: String,
88    defs: SqlServerSourceMetricDefs,
89    // Currently, this structure is not accessed across threads.
90    snapshot_table_size_latency:
91        Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
92    snapshot_table_lock_count:
93        Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicI64, Vec<String>>>>>,
94
95    pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
96    pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
97    pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
98    pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
99    pub(crate) snapshot_table_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
100}
101
102impl SqlServerSourceMetrics {
103    /// Create a `SqlServerSourceMetrics` from the `SqlServerSourceMetricDefs`.
104    pub(crate) fn new(
105        defs: &SqlServerSourceMetricDefs,
106        source_id: GlobalId,
107        worker_id: usize,
108    ) -> Self {
109        let source_id_labels = &[source_id.to_string(), worker_id.to_string()];
110        Self {
111            source_id: source_id.to_string(),
112            worker_id: worker_id.to_string(),
113            defs: defs.clone(),
114            inserts: defs
115                .insert_messages
116                .get_delete_on_drop_metric(source_id_labels.to_vec()),
117            updates: defs
118                .update_messages
119                .get_delete_on_drop_metric(source_id_labels.to_vec()),
120            deletes: defs
121                .delete_messages
122                .get_delete_on_drop_metric(source_id_labels.to_vec()),
123            ignored: defs
124                .ignored_messages
125                .get_delete_on_drop_metric(source_id_labels.to_vec()),
126            snapshot_table_count: defs
127                .snapshot_table_count
128                .get_delete_on_drop_metric(source_id_labels.to_vec()),
129            snapshot_table_size_latency: Default::default(),
130            snapshot_table_lock_count: Default::default(),
131        }
132    }
133
134    pub fn set_snapshot_table_size_latency(&self, table_name: &str, latency: f64) {
135        let mut snapshot_table_size_latency =
136            self.snapshot_table_size_latency.lock().expect("poisoned");
137        match snapshot_table_size_latency.entry(table_name.to_string()) {
138            std::collections::btree_map::Entry::Vacant(vacant_entry) => {
139                let labels = vec![
140                    self.source_id.clone(),
141                    self.worker_id.clone(),
142                    table_name.to_string(),
143                ];
144                let metric = self
145                    .defs
146                    .snapshot_table_size_latency
147                    .get_delete_on_drop_metric(labels);
148                vacant_entry.insert(metric).set(latency);
149            }
150            std::collections::btree_map::Entry::Occupied(occupied_entry) => {
151                occupied_entry.get().set(latency)
152            }
153        }
154    }
155
156    pub fn update_snapshot_table_lock_count(&self, table_name: &str, delta: i64) {
157        let mut snapshot_table_lock_count =
158            self.snapshot_table_lock_count.lock().expect("poisoned");
159        match snapshot_table_lock_count.entry(table_name.to_string()) {
160            std::collections::btree_map::Entry::Vacant(vacant_entry) => {
161                let labels = vec![
162                    self.source_id.clone(),
163                    self.worker_id.clone(),
164                    table_name.to_string(),
165                ];
166                let metric = self
167                    .defs
168                    .snapshot_table_lock
169                    .get_delete_on_drop_metric(labels);
170                vacant_entry.insert(metric).add(delta);
171            }
172            std::collections::btree_map::Entry::Occupied(occupied_entry) => {
173                occupied_entry.get().add(delta);
174            }
175        }
176    }
177}