mz_mysql_util/
replication.rs1use mysql_async::Conn;
11use mysql_async::prelude::Queryable;
12
13use crate::MySqlError;
14
15pub async fn query_sys_var(conn: &mut Conn, name: &str) -> Result<String, MySqlError> {
17 if !is_safe_sys_var_name(name) {
18 return Err(anyhow::anyhow!("invalid MySQL system variable name: {name}").into());
19 }
20
21 #[allow(clippy::disallowed_methods)]
25 let value: String = conn.query_first(format!("SELECT @@{name}")).await?.unwrap();
26 Ok(value)
27}
28
29fn is_safe_sys_var_name(name: &str) -> bool {
30 !name.is_empty()
31 && name.split('.').all(|segment| {
32 !segment.is_empty()
33 && segment
34 .chars()
35 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '$')
36 })
37}
38
39async fn verify_sys_setting(
41 conn: &mut Conn,
42 setting: &str,
43 expected: &str,
44) -> Result<(), MySqlError> {
45 match query_sys_var(conn, setting).await?.as_str() {
46 actual if actual == expected => Ok(()),
47 actual => Err(MySqlError::InvalidSystemSetting {
48 setting: setting.to_string(),
49 expected: expected.to_string(),
50 actual: actual.to_string(),
51 }),
52 }
53}
54
55pub async fn ensure_full_row_binlog_format(conn: &mut Conn) -> Result<(), MySqlError> {
56 verify_sys_setting(conn, "log_bin", "1").await?;
57 verify_sys_setting(conn, "binlog_format", "ROW").await?;
58 verify_sys_setting(conn, "binlog_row_image", "FULL").await?;
59 Ok(())
60}
61
62pub async fn ensure_gtid_consistency(conn: &mut Conn) -> Result<(), MySqlError> {
63 verify_sys_setting(conn, "gtid_mode", "ON").await?;
64 verify_sys_setting(conn, "enforce_gtid_consistency", "ON").await?;
65 verify_sys_setting(conn, "gtid_next", "AUTOMATIC").await?;
66 Ok(())
67}
68
69pub async fn ensure_replication_commit_order(conn: &mut Conn) -> Result<(), MySqlError> {
81 let is_multi_threaded = match query_sys_var(conn, "replica_parallel_workers").await {
83 Ok(val) => val != "0" && val != "1",
84 Err(_) => match query_sys_var(conn, "slave_parallel_workers").await {
85 Ok(val) => val != "0" && val != "1",
86 Err(err) => return Err(err),
87 },
88 };
89
90 if is_multi_threaded {
91 match verify_sys_setting(conn, "replica_preserve_commit_order", "1").await {
92 Ok(_) => Ok(()),
93 Err(_) => verify_sys_setting(conn, "slave_preserve_commit_order", "1").await,
94 }
95 } else {
96 Ok(())
97 }
98}