mz_storage/metrics/source/
mysql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for MySQL.
11
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use mz_ore::metric;
16use mz_ore::metrics::{
17    DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, UIntGaugeVec,
18};
19use mz_repr::GlobalId;
20use prometheus::core::{AtomicF64, AtomicU64};
21
22#[derive(Clone, Debug)]
23pub(crate) struct MySqlSourceMetricDefs {
24    pub(crate) total_messages: IntCounterVec,
25    pub(crate) ignored_messages: IntCounterVec,
26    pub(crate) insert_rows: IntCounterVec,
27    pub(crate) update_rows: IntCounterVec,
28    pub(crate) delete_rows: IntCounterVec,
29    pub(crate) tables: UIntGaugeVec,
30    pub(crate) gtid_txids: UIntGaugeVec,
31
32    pub(crate) snapshot_defs: MySqlSnapshotMetricDefs,
33}
34
35impl MySqlSourceMetricDefs {
36    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
37        Self {
38            total_messages: registry.register(metric!(
39                name: "mz_mysql_per_source_messages_total",
40                help: "The total number of replication messages for this source, not expected to be the sum of the other values.",
41                var_labels: ["source_id"],
42            )),
43            ignored_messages: registry.register(metric!(
44                name: "mz_mysql_per_source_ignored_messages",
45                help: "The number of messages ignored because of an irrelevant type or relation_id",
46                var_labels: ["source_id"],
47            )),
48            insert_rows: registry.register(metric!(
49                name: "mz_mysql_per_source_inserts",
50                help: "The number of inserts for all tables in this source",
51                var_labels: ["source_id"],
52            )),
53            update_rows: registry.register(metric!(
54                name: "mz_mysql_per_source_updates",
55                help: "The number of updates for all tables in this source",
56                var_labels: ["source_id"],
57            )),
58            delete_rows: registry.register(metric!(
59                name: "mz_mysql_per_source_deletes",
60                help: "The number of deletes for all tables in this source",
61                var_labels: ["source_id"],
62            )),
63            tables: registry.register(metric!(
64                name: "mz_mysql_per_source_tables_count",
65                help: "The number of upstream tables for this source",
66                var_labels: ["source_id"],
67            )),
68            gtid_txids: registry.register(metric!(
69                name: "mz_mysql_sum_gtid_txns",
70                help: "The sum of all transaction-ids committed for each GTID Source-ID UUID seen for this source",
71                var_labels: ["source_id"],
72            )),
73            snapshot_defs: MySqlSnapshotMetricDefs::register_with(registry),
74        }
75    }
76}
77
78/// Metrics for MySql sources.
79pub(crate) struct MySqlSourceMetrics {
80    pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
81    pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
82    pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
83    pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
84    pub(crate) total: DeleteOnDropCounter<AtomicU64, Vec<String>>,
85    pub(crate) tables: DeleteOnDropGauge<AtomicU64, Vec<String>>,
86    pub(crate) gtid_txids: DeleteOnDropGauge<AtomicU64, Vec<String>>,
87    pub(crate) snapshot_metrics: MySqlSnapshotMetrics,
88}
89
90impl MySqlSourceMetrics {
91    /// Create a `MySqlSourceMetrics` from the `MySqlSourceMetricDefs`.
92    pub(crate) fn new(defs: &MySqlSourceMetricDefs, source_id: GlobalId) -> Self {
93        let labels = &[source_id.to_string()];
94        Self {
95            inserts: defs.insert_rows.get_delete_on_drop_metric(labels.to_vec()),
96            updates: defs.update_rows.get_delete_on_drop_metric(labels.to_vec()),
97            deletes: defs.delete_rows.get_delete_on_drop_metric(labels.to_vec()),
98            ignored: defs
99                .ignored_messages
100                .get_delete_on_drop_metric(labels.to_vec()),
101            total: defs
102                .total_messages
103                .get_delete_on_drop_metric(labels.to_vec()),
104            tables: defs.tables.get_delete_on_drop_metric(labels.to_vec()),
105            gtid_txids: defs.gtid_txids.get_delete_on_drop_metric(labels.to_vec()),
106            snapshot_metrics: MySqlSnapshotMetrics {
107                source_id,
108                gauges: Default::default(),
109                defs: defs.snapshot_defs.clone(),
110            },
111        }
112    }
113}
114
115#[derive(Clone, Debug)]
116pub(crate) struct MySqlSnapshotMetricDefs {
117    pub(crate) table_count_latency: GaugeVec,
118}
119
120impl MySqlSnapshotMetricDefs {
121    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
122        Self {
123            table_count_latency: registry.register(metric!(
124                name: "mz_mysql_snapshot_count_latency",
125                help: "The wall time used to obtain snapshot sizes.",
126                var_labels: ["source_id", "table_name", "schema"],
127            )),
128        }
129    }
130}
131
132#[derive(Clone)]
133pub(crate) struct MySqlSnapshotMetrics {
134    source_id: GlobalId,
135    // This has to be shared between tokio tasks and the replication operator, as the collection
136    // of these metrics happens once in those tasks, which do not live long enough to keep them
137    // alive.
138    gauges: Arc<Mutex<Vec<DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
139    defs: MySqlSnapshotMetricDefs,
140}
141
142impl MySqlSnapshotMetrics {
143    pub(crate) fn record_table_count_latency(
144        &self,
145        table_name: String,
146        schema: String,
147        latency: f64,
148    ) {
149        let latency_gauge = self
150            .defs
151            .table_count_latency
152            .get_delete_on_drop_metric(vec![self.source_id.to_string(), table_name, schema]);
153        latency_gauge.set(latency);
154        self.gauges.lock().expect("poisoned").push(latency_gauge)
155    }
156}