mz_mysql_util/
replication.rs1use mysql_async::Conn;
11use mysql_async::prelude::Queryable;
12
13use crate::MySqlError;
14
15pub async fn query_sys_var(conn: &mut Conn, name: &str) -> Result<String, MySqlError> {
17 let value: String = conn
18 .query_first(format!("SELECT @@{}", name))
19 .await?
20 .unwrap();
21 Ok(value)
22}
23
24async fn verify_sys_setting(
26 conn: &mut Conn,
27 setting: &str,
28 expected: &str,
29) -> Result<(), MySqlError> {
30 match query_sys_var(conn, setting).await?.as_str() {
31 actual if actual == expected => Ok(()),
32 actual => Err(MySqlError::InvalidSystemSetting {
33 setting: setting.to_string(),
34 expected: expected.to_string(),
35 actual: actual.to_string(),
36 }),
37 }
38}
39
40pub async fn ensure_full_row_binlog_format(conn: &mut Conn) -> Result<(), MySqlError> {
41 verify_sys_setting(conn, "log_bin", "1").await?;
42 verify_sys_setting(conn, "binlog_format", "ROW").await?;
43 verify_sys_setting(conn, "binlog_row_image", "FULL").await?;
44 Ok(())
45}
46
47pub async fn ensure_gtid_consistency(conn: &mut Conn) -> Result<(), MySqlError> {
48 verify_sys_setting(conn, "gtid_mode", "ON").await?;
49 verify_sys_setting(conn, "enforce_gtid_consistency", "ON").await?;
50 verify_sys_setting(conn, "gtid_next", "AUTOMATIC").await?;
51 Ok(())
52}
53
54pub async fn ensure_replication_commit_order(conn: &mut Conn) -> Result<(), MySqlError> {
66 let is_multi_threaded = match query_sys_var(conn, "replica_parallel_workers").await {
68 Ok(val) => val != "0" && val != "1",
69 Err(_) => match query_sys_var(conn, "slave_parallel_workers").await {
70 Ok(val) => val != "0" && val != "1",
71 Err(err) => return Err(err),
72 },
73 };
74
75 if is_multi_threaded {
76 match verify_sys_setting(conn, "replica_preserve_commit_order", "1").await {
77 Ok(_) => Ok(()),
78 Err(_) => verify_sys_setting(conn, "slave_preserve_commit_order", "1").await,
79 }
80 } else {
81 Ok(())
82 }
83}