mz_mysql_util/
lib.rs
1mod tunnel;
13use std::time::Duration;
14
15use aws_rds::RdsTokenError;
16pub use tunnel::{
17 Config, DEFAULT_CONNECT_TIMEOUT, DEFAULT_SNAPSHOT_LOCK_WAIT_TIMEOUT,
18 DEFAULT_SNAPSHOT_MAX_EXECUTION_TIME, DEFAULT_TCP_KEEPALIVE, MySqlConn, TimeoutConfig,
19 TunnelConfig,
20};
21
22mod desc;
23pub use desc::{
24 MySqlColumnDesc, MySqlKeyDesc, MySqlTableDesc, ProtoMySqlColumnDesc, ProtoMySqlKeyDesc,
25 ProtoMySqlTableDesc,
26};
27
28mod replication;
29pub use replication::{
30 ensure_full_row_binlog_format, ensure_gtid_consistency, ensure_replication_commit_order,
31 query_sys_var,
32};
33
34pub mod schemas;
35pub use schemas::{
36 MySqlTableSchema, QualifiedTableRef, SYSTEM_SCHEMAS, SchemaRequest, schema_info,
37};
38
39pub mod privileges;
40pub use privileges::validate_source_privileges;
41
42pub mod decoding;
43pub use decoding::pack_mysql_row;
44mod aws_rds;
45
46#[derive(Debug, Clone)]
47pub struct UnsupportedDataType {
48 pub column_type: String,
49 pub qualified_table_name: String,
50 pub column_name: String,
51 pub intended_type: Option<String>,
52}
53
54impl std::fmt::Display for UnsupportedDataType {
55 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
56 match &self.intended_type {
57 Some(intended_type) => write!(
58 f,
59 "'{}.{}' of type '{}' represented as: '{}'",
60 self.qualified_table_name, self.column_name, self.column_type, intended_type
61 ),
62 None => write!(
63 f,
64 "'{}.{}' of type '{}'",
65 self.qualified_table_name, self.column_name, self.column_type
66 ),
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
72pub struct MissingPrivilege {
73 pub privilege: String,
74 pub qualified_table_name: String,
75}
76
77impl std::fmt::Display for MissingPrivilege {
78 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
79 write!(
80 f,
81 "Missing privilege '{}' for '{}'",
82 self.privilege, self.qualified_table_name
83 )
84 }
85}
86
87#[derive(Debug, thiserror::Error)]
88pub enum MySqlError {
89 #[error("error validating privileges: {0:?}")]
90 MissingPrivileges(Vec<MissingPrivilege>),
91 #[error("error creating mysql connection with config: {0}")]
92 InvalidClientConfig(String),
93 #[error("error setting up ssh: {0}")]
94 Ssh(#[source] anyhow::Error),
95 #[error("error decoding value for '{qualified_table_name}' column '{column_name}': {error}")]
96 ValueDecodeError {
97 column_name: String,
98 qualified_table_name: String,
99 error: String,
100 },
101 #[error("unsupported data types: {columns:?}")]
102 UnsupportedDataTypes { columns: Vec<UnsupportedDataType> },
103 #[error("duplicated column names in table '{qualified_table_name}': {columns:?}")]
104 DuplicatedColumnNames {
105 qualified_table_name: String,
106 columns: Vec<String>,
107 },
108 #[error("invalid mysql system setting '{setting}'. Expected '{expected}'. Got '{actual}'.")]
109 InvalidSystemSetting {
110 setting: String,
111 expected: String,
112 actual: String,
113 },
114 #[error(transparent)]
116 Generic(#[from] anyhow::Error),
117 #[error(transparent)]
119 MySql(#[from] mysql_async::Error),
120 #[error("connection attempt timed out after {0:?}")]
121 ConnectionTimeout(Duration),
122 #[error(transparent)]
124 AwsTokenError(#[from] RdsTokenError),
125}
126
127pub fn quote_identifier(identifier: &str) -> String {
129 let mut escaped = identifier.replace("`", "``");
130 escaped.insert(0, '`');
131 escaped.push('`');
132 escaped
133}
134
135pub const ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE: u16 = 1236;
139
140pub const ER_NO_SUCH_TABLE: u16 = 1146;
142
143#[cfg(test)]
144mod tests {
145
146 use super::quote_identifier;
147 #[mz_ore::test]
148 fn test_identifier_quoting() {
149 let expected = vec!["`a`", "`naughty``sql`", "```;naughty;sql;```"];
150 let input = ["a", "naughty`sql", "`;naughty;sql;`"]
151 .iter()
152 .map(|raw_str| quote_identifier(raw_str))
153 .collect::<Vec<_>>();
154 assert_eq!(expected, input);
155 }
156}