mz_mysql_util/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! MySQL utility library.
11
12mod tunnel;
13use std::time::Duration;
14
15use aws_rds::RdsTokenError;
16pub use tunnel::{
17    Config, DEFAULT_CONNECT_TIMEOUT, DEFAULT_SNAPSHOT_LOCK_WAIT_TIMEOUT,
18    DEFAULT_SNAPSHOT_MAX_EXECUTION_TIME, DEFAULT_TCP_KEEPALIVE, MySqlConn, TimeoutConfig,
19    TunnelConfig,
20};
21
22mod desc;
23pub use desc::{
24    MySqlColumnDesc, MySqlKeyDesc, MySqlTableDesc, ProtoMySqlColumnDesc, ProtoMySqlKeyDesc,
25    ProtoMySqlTableDesc,
26};
27
28mod replication;
29pub use replication::{
30    ensure_full_row_binlog_format, ensure_gtid_consistency, ensure_replication_commit_order,
31    query_sys_var,
32};
33
34pub mod schemas;
35pub use schemas::{
36    MySqlTableSchema, QualifiedTableRef, SYSTEM_SCHEMAS, SchemaRequest, schema_info,
37};
38
39pub mod privileges;
40pub use privileges::validate_source_privileges;
41
42pub mod decoding;
43pub use decoding::pack_mysql_row;
44mod aws_rds;
45
46#[derive(Debug, Clone)]
47pub struct UnsupportedDataType {
48    pub column_type: String,
49    pub qualified_table_name: String,
50    pub column_name: String,
51    pub intended_type: Option<String>,
52}
53
54impl std::fmt::Display for UnsupportedDataType {
55    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
56        match &self.intended_type {
57            Some(intended_type) => write!(
58                f,
59                "'{}.{}' of type '{}' represented as: '{}'",
60                self.qualified_table_name, self.column_name, self.column_type, intended_type
61            ),
62            None => write!(
63                f,
64                "'{}.{}' of type '{}'",
65                self.qualified_table_name, self.column_name, self.column_type
66            ),
67        }
68    }
69}
70
71#[derive(Debug, Clone)]
72pub struct MissingPrivilege {
73    pub privilege: String,
74    pub qualified_table_name: String,
75}
76
77impl std::fmt::Display for MissingPrivilege {
78    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
79        write!(
80            f,
81            "Missing privilege '{}' for '{}'",
82            self.privilege, self.qualified_table_name
83        )
84    }
85}
86
87#[derive(Debug, thiserror::Error)]
88pub enum MySqlError {
89    #[error("error validating privileges: {0:?}")]
90    MissingPrivileges(Vec<MissingPrivilege>),
91    #[error("error creating mysql connection with config: {0}")]
92    InvalidClientConfig(String),
93    #[error("error setting up ssh: {0}")]
94    Ssh(#[source] anyhow::Error),
95    #[error("error decoding value for '{qualified_table_name}' column '{column_name}': {error}")]
96    ValueDecodeError {
97        column_name: String,
98        qualified_table_name: String,
99        error: String,
100    },
101    #[error("unsupported data types: {columns:?}")]
102    UnsupportedDataTypes { columns: Vec<UnsupportedDataType> },
103    #[error("duplicated column names in table '{qualified_table_name}': {columns:?}")]
104    DuplicatedColumnNames {
105        qualified_table_name: String,
106        columns: Vec<String>,
107    },
108    #[error("invalid mysql system setting '{setting}'. Expected '{expected}'. Got '{actual}'.")]
109    InvalidSystemSetting {
110        setting: String,
111        expected: String,
112        actual: String,
113    },
114    /// Any other error we bail on.
115    #[error(transparent)]
116    Generic(#[from] anyhow::Error),
117    /// A mysql_async error.
118    #[error(transparent)]
119    MySql(#[from] mysql_async::Error),
120    #[error("connection attempt timed out after {0:?}")]
121    ConnectionTimeout(Duration),
122    /// Error retrieving AWS authorization token
123    #[error(transparent)]
124    AwsTokenError(#[from] RdsTokenError),
125}
126
127/// Quotes MySQL identifiers. [See MySQL quote_identifier()](https://github.com/mysql/mysql-sys/blob/master/functions/quote_identifier.sql)
128pub fn quote_identifier(identifier: &str) -> String {
129    let mut escaped = identifier.replace("`", "``");
130    escaped.insert(0, '`');
131    escaped.push('`');
132    escaped
133}
134
135// NOTE: this error was renamed between MySQL 5.7 and 8.0
136// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_source_fatal_error_reading_binlog
137// https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html#error_er_master_fatal_error_reading_binlog
138pub const ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE: u16 = 1236;
139
140// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_no_such_table
141pub const ER_NO_SUCH_TABLE: u16 = 1146;
142
143#[cfg(test)]
144mod tests {
145
146    use super::quote_identifier;
147    #[mz_ore::test]
148    fn test_identifier_quoting() {
149        let expected = vec!["`a`", "`naughty``sql`", "```;naughty;sql;```"];
150        let input = ["a", "naughty`sql", "`;naughty;sql;`"]
151            .iter()
152            .map(|raw_str| quote_identifier(raw_str))
153            .collect::<Vec<_>>();
154        assert_eq!(expected, input);
155    }
156}