1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Metrics for MySQL.
1112use std::sync::Arc;
13use std::sync::Mutex;
1415use mz_ore::metric;
16use mz_ore::metrics::{
17 DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, UIntGaugeVec,
18};
19use mz_repr::GlobalId;
20use prometheus::core::{AtomicF64, AtomicU64};
2122#[derive(Clone, Debug)]
23pub(crate) struct MySqlSourceMetricDefs {
24pub(crate) total_messages: IntCounterVec,
25pub(crate) ignored_messages: IntCounterVec,
26pub(crate) insert_rows: IntCounterVec,
27pub(crate) update_rows: IntCounterVec,
28pub(crate) delete_rows: IntCounterVec,
29pub(crate) tables: UIntGaugeVec,
30pub(crate) gtid_txids: UIntGaugeVec,
3132pub(crate) snapshot_defs: MySqlSnapshotMetricDefs,
33}
3435impl MySqlSourceMetricDefs {
36pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
37Self {
38 total_messages: registry.register(metric!(
39 name: "mz_mysql_per_source_messages_total",
40 help: "The total number of replication messages for this source, not expected to be the sum of the other values.",
41 var_labels: ["source_id"],
42 )),
43 ignored_messages: registry.register(metric!(
44 name: "mz_mysql_per_source_ignored_messages",
45 help: "The number of messages ignored because of an irrelevant type or relation_id",
46 var_labels: ["source_id"],
47 )),
48 insert_rows: registry.register(metric!(
49 name: "mz_mysql_per_source_inserts",
50 help: "The number of inserts for all tables in this source",
51 var_labels: ["source_id"],
52 )),
53 update_rows: registry.register(metric!(
54 name: "mz_mysql_per_source_updates",
55 help: "The number of updates for all tables in this source",
56 var_labels: ["source_id"],
57 )),
58 delete_rows: registry.register(metric!(
59 name: "mz_mysql_per_source_deletes",
60 help: "The number of deletes for all tables in this source",
61 var_labels: ["source_id"],
62 )),
63 tables: registry.register(metric!(
64 name: "mz_mysql_per_source_tables_count",
65 help: "The number of upstream tables for this source",
66 var_labels: ["source_id"],
67 )),
68 gtid_txids: registry.register(metric!(
69 name: "mz_mysql_sum_gtid_txns",
70 help: "The sum of all transaction-ids committed for each GTID Source-ID UUID seen for this source",
71 var_labels: ["source_id"],
72 )),
73 snapshot_defs: MySqlSnapshotMetricDefs::register_with(registry),
74 }
75 }
76}
7778/// Metrics for MySql sources.
79pub(crate) struct MySqlSourceMetrics {
80pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
81pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
82pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
83pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
84pub(crate) total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
85pub(crate) tables: DeleteOnDropGauge<AtomicU64, Vec<String>>,
86pub(crate) gtid_txids: DeleteOnDropGauge<AtomicU64, Vec<String>>,
87pub(crate) snapshot_metrics: MySqlSnapshotMetrics,
88}
8990impl MySqlSourceMetrics {
91/// Create a `MySqlSourceMetrics` from the `MySqlSourceMetricDefs`.
92pub(crate) fn new(defs: &MySqlSourceMetricDefs, source_id: GlobalId) -> Self {
93let labels = &[source_id.to_string()];
94Self {
95 inserts: defs.insert_rows.get_delete_on_drop_metric(labels.to_vec()),
96 updates: defs.update_rows.get_delete_on_drop_metric(labels.to_vec()),
97 deletes: defs.delete_rows.get_delete_on_drop_metric(labels.to_vec()),
98 ignored: defs
99 .ignored_messages
100 .get_delete_on_drop_metric(labels.to_vec()),
101 total: defs
102 .total_messages
103 .get_delete_on_drop_metric(labels.to_vec()),
104 tables: defs.tables.get_delete_on_drop_metric(labels.to_vec()),
105 gtid_txids: defs.gtid_txids.get_delete_on_drop_metric(labels.to_vec()),
106 snapshot_metrics: MySqlSnapshotMetrics {
107 source_id,
108 gauges: Default::default(),
109 defs: defs.snapshot_defs.clone(),
110 },
111 }
112 }
113}
114115#[derive(Clone, Debug)]
116pub(crate) struct MySqlSnapshotMetricDefs {
117pub(crate) table_count_latency: GaugeVec,
118}
119120impl MySqlSnapshotMetricDefs {
121pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
122Self {
123 table_count_latency: registry.register(metric!(
124 name: "mz_mysql_snapshot_count_latency",
125 help: "The wall time used to obtain snapshot sizes.",
126 var_labels: ["source_id", "table_name", "schema"],
127 )),
128 }
129 }
130}
131132#[derive(Clone)]
133pub(crate) struct MySqlSnapshotMetrics {
134 source_id: GlobalId,
135// This has to be shared between tokio tasks and the replication operator, as the collection
136 // of these metrics happens once in those tasks, which do not live long enough to keep them
137 // alive.
138gauges: Arc<Mutex<Vec<DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
139 defs: MySqlSnapshotMetricDefs,
140}
141142impl MySqlSnapshotMetrics {
143pub(crate) fn record_table_count_latency(
144&self,
145 table_name: String,
146 schema: String,
147 latency: f64,
148 ) {
149let latency_gauge = self
150.defs
151 .table_count_latency
152 .get_delete_on_drop_metric(vec![self.source_id.to_string(), table_name, schema]);
153 latency_gauge.set(latency);
154self.gauges.lock().expect("poisoned").push(latency_gauge)
155 }
156}