mz_storage/metrics/source/
sql_server.rs1use std::collections::BTreeMap;
13use std::rc::Rc;
14use std::sync::Mutex;
15
16use mz_ore::metric;
17use mz_ore::metrics::{
18 DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, IntGaugeVec, MetricsRegistry,
19 UIntGaugeVec,
20};
21use mz_repr::GlobalId;
22use prometheus::core::{AtomicF64, AtomicI64, AtomicU64};
23
24#[derive(Clone, Debug)]
26pub(crate) struct SqlServerSourceMetricDefs {
27 pub(crate) ignored_messages: IntCounterVec,
28 pub(crate) insert_messages: IntCounterVec,
29 pub(crate) update_messages: IntCounterVec,
30 pub(crate) delete_messages: IntCounterVec,
31 pub(crate) snapshot_table_count: UIntGaugeVec,
32 pub(crate) snapshot_table_size_latency: GaugeVec,
33 pub(crate) snapshot_table_lock: IntGaugeVec,
34}
35
36impl SqlServerSourceMetricDefs {
37 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
38 Self {
44 ignored_messages: registry.register(metric!(
45 name: "mz_sql_server_per_source_ignored_messages",
46 help: "The number of messages ignored because of an irrelevant type or relation_id",
47 var_labels: ["source_id", "worker_id"],
48 )),
49 insert_messages: registry.register(metric!(
50 name: "mz_sql_server_per_source_inserts",
51 help: "The number of inserts for all tables in this source",
52 var_labels: ["source_id", "worker_id"],
53 )),
54 update_messages: registry.register(metric!(
55 name: "mz_sql_server_per_source_updates",
56 help: "The number of updates for all tables in this source",
57 var_labels: ["source_id", "worker_id"],
58 )),
59 delete_messages: registry.register(metric!(
60 name: "mz_sql_server_per_source_deletes",
61 help: "The number of deletes for all tables in this source",
62 var_labels: ["source_id", "worker_id"],
63 )),
64 snapshot_table_count: registry.register(metric!(
65 name: "mz_sql_server_snapshot_table_count",
66 help: "The number of tables that SQL Server still needs to snapshot",
67 var_labels: ["source_id", "worker_id"],
68 )),
69 snapshot_table_size_latency: registry.register(metric!(
70 name: "mz_sql_server_snapshot_count_latency",
71 help: "The wall time used to obtain snapshot sizes.",
72 var_labels: ["source_id", "worker_id", "table_name"],
73 )),
74 snapshot_table_lock: registry.register(metric!(
75 name: "mz_sql_server_snapshot_table_lock",
76 help: "The upstream tables locked for snapshot.",
77 var_labels: ["source_id", "worker_id", "table_name"],
78 )),
79 }
80 }
81}
82#[derive(Clone)]
83pub(crate) struct SqlServerSourceMetrics {
85 source_id: String,
87 worker_id: String,
88 defs: SqlServerSourceMetricDefs,
89 snapshot_table_size_latency:
91 Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
92 snapshot_table_lock_count:
93 Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicI64, Vec<String>>>>>,
94
95 pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
96 pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
97 pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
98 pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
99 pub(crate) snapshot_table_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
100}
101
102impl SqlServerSourceMetrics {
103 pub(crate) fn new(
105 defs: &SqlServerSourceMetricDefs,
106 source_id: GlobalId,
107 worker_id: usize,
108 ) -> Self {
109 let source_id_labels = &[source_id.to_string(), worker_id.to_string()];
110 Self {
111 source_id: source_id.to_string(),
112 worker_id: worker_id.to_string(),
113 defs: defs.clone(),
114 inserts: defs
115 .insert_messages
116 .get_delete_on_drop_metric(source_id_labels.to_vec()),
117 updates: defs
118 .update_messages
119 .get_delete_on_drop_metric(source_id_labels.to_vec()),
120 deletes: defs
121 .delete_messages
122 .get_delete_on_drop_metric(source_id_labels.to_vec()),
123 ignored: defs
124 .ignored_messages
125 .get_delete_on_drop_metric(source_id_labels.to_vec()),
126 snapshot_table_count: defs
127 .snapshot_table_count
128 .get_delete_on_drop_metric(source_id_labels.to_vec()),
129 snapshot_table_size_latency: Default::default(),
130 snapshot_table_lock_count: Default::default(),
131 }
132 }
133
134 pub fn set_snapshot_table_size_latency(&self, table_name: &str, latency: f64) {
135 let mut snapshot_table_size_latency =
136 self.snapshot_table_size_latency.lock().expect("poisoned");
137 match snapshot_table_size_latency.entry(table_name.to_string()) {
138 std::collections::btree_map::Entry::Vacant(vacant_entry) => {
139 let labels = vec![
140 self.source_id.clone(),
141 self.worker_id.clone(),
142 table_name.to_string(),
143 ];
144 let metric = self
145 .defs
146 .snapshot_table_size_latency
147 .get_delete_on_drop_metric(labels);
148 vacant_entry.insert(metric).set(latency);
149 }
150 std::collections::btree_map::Entry::Occupied(occupied_entry) => {
151 occupied_entry.get().set(latency)
152 }
153 }
154 }
155
156 pub fn update_snapshot_table_lock_count(&self, table_name: &str, delta: i64) {
157 let mut snapshot_table_lock_count =
158 self.snapshot_table_lock_count.lock().expect("poisoned");
159 match snapshot_table_lock_count.entry(table_name.to_string()) {
160 std::collections::btree_map::Entry::Vacant(vacant_entry) => {
161 let labels = vec![
162 self.source_id.clone(),
163 self.worker_id.clone(),
164 table_name.to_string(),
165 ];
166 let metric = self
167 .defs
168 .snapshot_table_lock
169 .get_delete_on_drop_metric(labels);
170 vacant_entry.insert(metric).add(delta);
171 }
172 std::collections::btree_map::Entry::Occupied(occupied_entry) => {
173 occupied_entry.get().add(delta);
174 }
175 }
176 }
177}