Skip to main content

mz_mysql_util/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mysql_async::Conn;
11use mysql_async::prelude::Queryable;
12
13use crate::MySqlError;
14
15/// Query a MySQL System Variable
16pub async fn query_sys_var(conn: &mut Conn, name: &str) -> Result<String, MySqlError> {
17    if !is_safe_sys_var_name(name) {
18        return Err(anyhow::anyhow!("invalid MySQL system variable name: {name}").into());
19    }
20
21    // System variable names are interpolated unparameterized, so this uses
22    // `query_first` rather than the prepared `exec_*` family. The
23    // `is_safe_sys_var_name` allowlist above keeps it safe.
24    #[allow(clippy::disallowed_methods)]
25    let value: String = conn.query_first(format!("SELECT @@{name}")).await?.unwrap();
26    Ok(value)
27}
28
29fn is_safe_sys_var_name(name: &str) -> bool {
30    !name.is_empty()
31        && name.split('.').all(|segment| {
32            !segment.is_empty()
33                && segment
34                    .chars()
35                    .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '$')
36        })
37}
38
39/// Verify a MySQL System Variable matches the expected value
40async fn verify_sys_setting(
41    conn: &mut Conn,
42    setting: &str,
43    expected: &str,
44) -> Result<(), MySqlError> {
45    match query_sys_var(conn, setting).await?.as_str() {
46        actual if actual == expected => Ok(()),
47        actual => Err(MySqlError::InvalidSystemSetting {
48            setting: setting.to_string(),
49            expected: expected.to_string(),
50            actual: actual.to_string(),
51        }),
52    }
53}
54
55pub async fn ensure_full_row_binlog_format(conn: &mut Conn) -> Result<(), MySqlError> {
56    verify_sys_setting(conn, "log_bin", "1").await?;
57    verify_sys_setting(conn, "binlog_format", "ROW").await?;
58    verify_sys_setting(conn, "binlog_row_image", "FULL").await?;
59    Ok(())
60}
61
62pub async fn ensure_gtid_consistency(conn: &mut Conn) -> Result<(), MySqlError> {
63    verify_sys_setting(conn, "gtid_mode", "ON").await?;
64    verify_sys_setting(conn, "enforce_gtid_consistency", "ON").await?;
65    verify_sys_setting(conn, "gtid_next", "AUTOMATIC").await?;
66    Ok(())
67}
68
69/// In case this is a MySQL replica, we ensure that the replication settings are such that
70/// the replica would commit all transactions in the order they were committed on the primary.
71/// We don't really know that this is a replica, but if the settings indicate multi-threaded
72/// replication and the preserve-commit-order setting is not on, then it _could_ be a replica
73/// with correctness issues.
74/// We used to check `performance_schema.replication_connection_configuration` to determine if
75/// this was in-fact a replica but that requires non-standard privileges.
76/// Before MySQL 8.0.27, single-threaded was default and preserve-commit-order was not, and after
77/// 8.0.27 multi-threaded is default and preserve-commit-order is default on. So both of those
78/// default scenarios are fine. Unfortunately on some versions of MySQL on RDS, the default
79/// parameters use multi-threading without the preserve-commit-order setting on.
80pub async fn ensure_replication_commit_order(conn: &mut Conn) -> Result<(), MySqlError> {
81    // This system variables were renamed between MySQL 5.7 and 8.0
82    let is_multi_threaded = match query_sys_var(conn, "replica_parallel_workers").await {
83        Ok(val) => val != "0" && val != "1",
84        Err(_) => match query_sys_var(conn, "slave_parallel_workers").await {
85            Ok(val) => val != "0" && val != "1",
86            Err(err) => return Err(err),
87        },
88    };
89
90    if is_multi_threaded {
91        match verify_sys_setting(conn, "replica_preserve_commit_order", "1").await {
92            Ok(_) => Ok(()),
93            Err(_) => verify_sys_setting(conn, "slave_preserve_commit_order", "1").await,
94        }
95    } else {
96        Ok(())
97    }
98}