1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::LazyLock;
12
13use itertools::Itertools;
14use maplit::btreeset;
15use regex::Regex;
16
17use mysql_async::prelude::{FromRow, Queryable};
18use mysql_async::{FromRowError, Row};
19
20use mz_repr::adt::char::CharLength;
21use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
22use mz_repr::adt::timestamp::TimestampPrecision;
23use mz_repr::adt::varchar::VarCharMaxLength;
24use mz_repr::{ColumnType, ScalarType};
25
26use crate::desc::{
27 MySqlColumnDesc, MySqlColumnMeta, MySqlColumnMetaEnum, MySqlKeyDesc, MySqlTableDesc,
28};
29use crate::{MySqlError, UnsupportedDataType};
30
31pub static SYSTEM_SCHEMAS: LazyLock<BTreeSet<&str>> = LazyLock::new(|| {
34 btreeset! {
35 "information_schema",
36 "performance_schema",
37 "mysql",
38 "sys",
39 }
40});
41
42const INFO_SCHEMA_COLS: &[&str] = &[
45 "column_name",
46 "data_type",
47 "column_type",
48 "is_nullable",
49 "numeric_precision",
50 "numeric_scale",
51 "datetime_precision",
52 "character_maximum_length",
53];
54
55#[derive(Debug, Clone)]
57pub struct InfoSchema {
58 column_name: String,
59 data_type: String,
60 column_type: String,
61 is_nullable: String,
62 numeric_precision: Option<i64>,
63 numeric_scale: Option<i64>,
64 datetime_precision: Option<i64>,
65 character_maximum_length: Option<i64>,
66}
67
68impl FromRow for InfoSchema {
69 fn from_row_opt(row: Row) -> Result<Self, FromRowError> {
70 let actual = row.columns_ref().iter().map(|c| c.name_ref());
71 let expected = INFO_SCHEMA_COLS.iter().map(|c| c.as_bytes());
72 itertools::assert_equal(actual, expected);
73 let (a, b, c, d, e, f, g, h) = FromRow::from_row_opt(row)?;
74 Ok(Self {
75 column_name: a,
76 data_type: b,
77 column_type: c,
78 is_nullable: d,
79 numeric_precision: e,
80 numeric_scale: f,
81 datetime_precision: g,
82 character_maximum_length: h,
83 })
84 }
85}
86
87impl InfoSchema {
88 pub fn name(self) -> String {
89 self.column_name
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct MySqlTableSchema {
96 pub schema_name: String,
97 pub name: String,
98 pub columns: Vec<InfoSchema>,
99 pub keys: BTreeSet<MySqlKeyDesc>,
100}
101
102impl MySqlTableSchema {
103 pub fn table_ref<'a>(&'a self) -> QualifiedTableRef<'a> {
104 QualifiedTableRef {
105 schema_name: &self.schema_name,
106 table_name: &self.name,
107 }
108 }
109
110 pub fn to_desc(
113 self,
114 text_columns: Option<&BTreeSet<&str>>,
115 exclude_columns: Option<&BTreeSet<&str>>,
116 ) -> Result<MySqlTableDesc, MySqlError> {
117 match (&text_columns, &exclude_columns) {
119 (Some(text_cols), Some(ignore_cols)) => {
120 let intersection: Vec<_> = text_cols.intersection(ignore_cols).collect();
121 if !intersection.is_empty() {
122 Err(MySqlError::DuplicatedColumnNames {
123 qualified_table_name: format!("{:?}.{:?}", self.schema_name, self.name),
124 columns: intersection.iter().map(|s| (*s).to_string()).collect(),
125 })?;
126 }
127 }
128 _ => (),
129 };
130
131 let mut columns = Vec::with_capacity(self.columns.len());
132 let mut error_cols = vec![];
133 for info in self.columns {
134 if let Some(text_columns) = &text_columns {
137 if text_columns.contains(&info.column_name.as_str()) {
138 match parse_as_text_column(&info, &self.schema_name, &self.name) {
139 Err(err) => error_cols.push(err),
140 Ok((scalar_type, meta)) => columns.push(MySqlColumnDesc {
141 name: info.column_name,
142 column_type: Some(ColumnType {
143 scalar_type,
144 nullable: &info.is_nullable == "YES",
145 }),
146 meta,
147 }),
148 }
149 continue;
150 }
151 }
152
153 if let Some(ignore_cols) = &exclude_columns {
155 if ignore_cols.contains(&info.column_name.as_str()) {
156 columns.push(MySqlColumnDesc {
157 name: info.column_name,
158 column_type: None,
159 meta: None,
160 });
161 continue;
162 }
163 }
164
165 match parse_data_type(&info, &self.schema_name, &self.name) {
167 Err(err) => error_cols.push(err),
168 Ok((scalar_type, meta)) => columns.push(MySqlColumnDesc {
169 name: info.column_name,
170 column_type: Some(ColumnType {
171 scalar_type,
172 nullable: &info.is_nullable == "YES",
173 }),
174 meta,
175 }),
176 }
177 }
178 if error_cols.len() > 0 {
179 Err(MySqlError::UnsupportedDataTypes {
180 columns: error_cols,
181 })?;
182 }
183
184 Ok(MySqlTableDesc {
185 schema_name: self.schema_name,
186 name: self.name,
187 columns,
188 keys: self.keys,
189 })
190 }
191}
192
193pub enum SchemaRequest<'a> {
195 All,
198 AllWithSystemSchemas,
201 Schemas(Vec<&'a str>),
203 Tables(Vec<(&'a str, &'a str)>),
205}
206
207#[derive(Debug, Hash, PartialEq, Eq, Clone, Ord, PartialOrd)]
209pub struct QualifiedTableRef<'a> {
210 pub schema_name: &'a str,
211 pub table_name: &'a str,
212}
213
214pub async fn schema_info<'a, Q>(
216 conn: &mut Q,
217 schema_request: &SchemaRequest<'a>,
218) -> Result<Vec<MySqlTableSchema>, MySqlError>
219where
220 Q: Queryable,
221{
222 let table_rows: Vec<(String, String)> = match schema_request {
223 SchemaRequest::All => {
224 let table_q = format!(
225 "SELECT table_name, table_schema
226 FROM information_schema.tables
227 WHERE table_type = 'BASE TABLE'
228 AND table_schema NOT IN ({})",
229 SYSTEM_SCHEMAS.iter().map(|s| format!("'{}'", s)).join(", ")
230 );
231 conn.exec(table_q, ()).await?
232 }
233 SchemaRequest::AllWithSystemSchemas => {
234 let table_q = "SELECT table_name, table_schema
235 FROM information_schema.tables
236 WHERE table_type = 'BASE TABLE'";
237 conn.exec(table_q, ()).await?
238 }
239 SchemaRequest::Schemas(schemas) => {
240 if schemas.is_empty() {
242 return Ok(vec![]);
243 }
244 let table_q = format!(
245 "SELECT table_name, table_schema
246 FROM information_schema.tables
247 WHERE table_type = 'BASE TABLE'
248 AND table_schema IN ({})",
249 schemas.iter().map(|_| "?").join(", ")
250 );
251 conn.exec(table_q, schemas).await?
252 }
253 SchemaRequest::Tables(tables) => {
254 if tables.is_empty() {
256 return Ok(vec![]);
257 }
258 let table_q = format!(
259 "SELECT table_name, table_schema
260 FROM information_schema.tables
261 WHERE table_type = 'BASE TABLE'
262 AND (table_schema, table_name) IN ({})",
263 tables.iter().map(|_| "(?, ?)").join(", ")
264 );
265 conn.exec(
266 table_q,
267 tables
268 .iter()
269 .flat_map(|(s, t)| [*s, *t])
270 .collect::<Vec<_>>(),
271 )
272 .await?
273 }
274 };
275
276 let mut tables = vec![];
277 for (table_name, schema_name) in table_rows {
278 let column_q = format!(
281 "SELECT {}
282 FROM information_schema.columns
283 WHERE table_name = ? AND table_schema = ?
284 ORDER BY ordinal_position ASC",
285 INFO_SCHEMA_COLS
286 .iter()
287 .map(|c| format!("{c} AS {c}"))
288 .join(", ")
289 );
290 let column_rows = conn
291 .exec::<InfoSchema, _, _>(column_q, (&table_name, &schema_name))
292 .await?;
293
294 let index_rows = conn
297 .exec::<(String, String), _, _>(
298 "SELECT
299 index_name,
300 column_name
301 FROM information_schema.statistics AS outt
302 WHERE
303 table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
304 AND NOT EXISTS (
305 SELECT 1
306 FROM information_schema.statistics AS inn
307 WHERE outt.index_name = inn.index_name AND inn.column_name IS NULL
308 )
309 AND non_unique = 0
310 AND table_name = ?
311 AND table_schema = ?
312 ORDER BY index_name, seq_in_index
313 ",
314 (&table_name, &schema_name),
315 )
316 .await?;
317
318 let mut indices = BTreeMap::new();
319 for (index_name, column) in index_rows {
320 indices
321 .entry(index_name)
322 .or_insert_with(Vec::new)
323 .push(column);
324 }
325 let mut keys = BTreeSet::new();
326 while let Some((index_name, columns)) = indices.pop_first() {
327 keys.insert(MySqlKeyDesc {
328 is_primary: &index_name == "PRIMARY",
329 name: index_name,
330 columns,
331 });
332 }
333
334 tables.push(MySqlTableSchema {
335 schema_name,
336 name: table_name,
337 columns: column_rows,
338 keys,
339 });
340 }
341
342 Ok(tables)
343}
344
345fn parse_data_type(
346 info: &InfoSchema,
347 schema_name: &str,
348 table_name: &str,
349) -> Result<(ScalarType, Option<MySqlColumnMeta>), UnsupportedDataType> {
350 let unsigned = info.column_type.contains("unsigned");
351
352 let scalar_type = match info.data_type.as_str() {
353 "tinyint" | "smallint" => {
354 if unsigned {
355 ScalarType::UInt16
356 } else {
357 ScalarType::Int16
358 }
359 }
360 "mediumint" | "int" => {
361 if unsigned {
362 ScalarType::UInt32
363 } else {
364 ScalarType::Int32
365 }
366 }
367 "bigint" => {
368 if unsigned {
369 ScalarType::UInt64
370 } else {
371 ScalarType::Int64
372 }
373 }
374 "float" => ScalarType::Float32,
375 "double" => ScalarType::Float64,
376 "date" => ScalarType::Date,
377 "datetime" | "timestamp" => ScalarType::Timestamp {
378 precision: info
381 .datetime_precision
382 .map(TimestampPrecision::try_from)
383 .transpose()
384 .map_err(|_| UnsupportedDataType {
385 column_type: info.column_type.clone(),
386 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
387 column_name: info.column_name.clone(),
388 intended_type: None,
389 })?,
390 },
391 "time" => ScalarType::Time,
392 "decimal" | "numeric" => {
393 if info.numeric_precision.unwrap_or_default() > NUMERIC_DATUM_MAX_PRECISION.into() {
397 Err(UnsupportedDataType {
398 column_type: info.column_type.clone(),
399 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
400 column_name: info.column_name.clone(),
401 intended_type: None,
402 })?
403 }
404 ScalarType::Numeric {
405 max_scale: info
406 .numeric_scale
407 .map(NumericMaxScale::try_from)
408 .transpose()
409 .map_err(|_| UnsupportedDataType {
410 column_type: info.column_type.clone(),
411 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
412 column_name: info.column_name.clone(),
413 intended_type: None,
414 })?,
415 }
416 }
417 "char" => ScalarType::Char {
418 length: info
419 .character_maximum_length
420 .and_then(|f| Some(CharLength::try_from(f)))
421 .transpose()
422 .map_err(|_| UnsupportedDataType {
423 column_type: info.column_type.clone(),
424 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
425 column_name: info.column_name.clone(),
426 intended_type: None,
427 })?,
428 },
429 "varchar" => ScalarType::VarChar {
430 max_length: info
431 .character_maximum_length
432 .and_then(|f| Some(VarCharMaxLength::try_from(f)))
433 .transpose()
434 .map_err(|_| UnsupportedDataType {
435 column_type: info.column_type.clone(),
436 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
437 column_name: info.column_name.clone(),
438 intended_type: None,
439 })?,
440 },
441 "text" | "tinytext" | "mediumtext" | "longtext" => ScalarType::String,
442 "binary" | "varbinary" | "tinyblob" | "blob" | "mediumblob" | "longblob" => {
443 ScalarType::Bytes
444 }
445 "json" => ScalarType::Jsonb,
446 "bit" => {
448 let precision = match info.numeric_precision {
449 Some(x @ 0..=64) => u32::try_from(x).expect("known good value"),
450 prec => {
451 mz_ore::soft_panic_or_log!(
452 "found invalid bit precision, {prec:?}, falling back"
453 );
454 64u32
455 }
456 };
457 return Ok((ScalarType::UInt64, Some(MySqlColumnMeta::Bit(precision))));
458 }
459 typ => {
460 tracing::warn!(?typ, "found unsupported data type");
461 return Err(UnsupportedDataType {
462 column_type: info.column_type.clone(),
463 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
464 column_name: info.column_name.clone(),
465 intended_type: None,
466 });
467 }
468 };
469
470 Ok((scalar_type, None))
471}
472
473fn parse_as_text_column(
477 info: &InfoSchema,
478 schema_name: &str,
479 table_name: &str,
480) -> Result<(ScalarType, Option<MySqlColumnMeta>), UnsupportedDataType> {
481 match info.data_type.as_str() {
482 "year" => Ok((ScalarType::String, Some(MySqlColumnMeta::Year))),
483 "json" => Ok((ScalarType::String, Some(MySqlColumnMeta::Json))),
484 "enum" => Ok((
485 ScalarType::String,
486 Some(MySqlColumnMeta::Enum(MySqlColumnMetaEnum {
487 values: enum_vals_from_column_type(info.column_type.as_str()).map_err(|_| {
488 UnsupportedDataType {
489 column_type: info.column_type.clone(),
490 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
491 column_name: info.column_name.clone(),
492 intended_type: Some("text".to_string()),
493 }
494 })?,
495 })),
496 )),
497 "date" => Ok((ScalarType::String, Some(MySqlColumnMeta::Date))),
498 "datetime" | "timestamp" => Ok((
499 ScalarType::String,
500 Some(MySqlColumnMeta::Timestamp(
501 info.datetime_precision
502 .unwrap_or_default()
504 .try_into()
505 .map_err(|_| UnsupportedDataType {
506 column_type: info.column_type.clone(),
507 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
508 column_name: info.column_name.clone(),
509 intended_type: Some("text".to_string()),
510 })?,
511 )),
512 )),
513 _ => Err(UnsupportedDataType {
514 column_type: info.column_type.clone(),
515 qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
516 column_name: info.column_name.clone(),
517 intended_type: Some("text".to_string()),
518 }),
519 }
520}
521
522static ENUM_VAL_REGEX: LazyLock<Regex> =
523 LazyLock::new(|| Regex::new(r"'((?:[^']|'')*)'").expect("valid regex"));
524
525fn enum_vals_from_column_type(s: &str) -> Result<Vec<String>, anyhow::Error> {
530 let vals_str = s
531 .strip_prefix("enum(")
532 .and_then(|s| s.strip_suffix(')'))
533 .ok_or_else(|| anyhow::format_err!("Unable to parse enum column type string"))?;
534
535 Ok(ENUM_VAL_REGEX
536 .captures_iter(vals_str)
537 .map(|s| s[1].replace("''", "'"))
538 .collect())
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[mz_ore::test]
546 fn test_enum_value_parsing() {
547 let vals =
548 enum_vals_from_column_type("enum('apple','banana','cher,ry','ora''nge')").unwrap();
549 assert_eq!(vals, vec!["apple", "banana", "cher,ry", "ora'nge"]);
550 }
551}