mz_storage/source/mysql/
schemas.rs1use std::collections::{BTreeMap, BTreeSet};
11
12use mysql_async::prelude::Queryable;
13use mz_mysql_util::{MySqlError, SchemaRequest, schema_info};
14
15use super::{DefiniteError, MySqlTableName, SourceOutputInfo};
16
17pub(super) async fn verify_schemas<'a, Q, I>(
22 conn: &mut Q,
23 expected: BTreeMap<&'a MySqlTableName, I>,
24) -> Result<Vec<(&'a SourceOutputInfo, DefiniteError)>, MySqlError>
25where
26 Q: Queryable,
27 I: IntoIterator<Item = &'a SourceOutputInfo>,
28{
29 let cur_schemas: BTreeMap<_, _> = schema_info(
30 conn,
31 &SchemaRequest::Tables(
32 expected
33 .iter()
34 .map(|(f, _)| (f.0.as_str(), f.1.as_str()))
35 .collect(),
36 ),
37 )
38 .await?
39 .into_iter()
40 .map(|schema| {
41 (
42 MySqlTableName::new(&schema.schema_name, &schema.name),
43 schema,
44 )
45 })
46 .collect();
47
48 Ok(expected
49 .into_iter()
50 .flat_map(|(table, outputs)| {
51 outputs
54 .into_iter()
55 .filter_map(|output| match &cur_schemas.get(table) {
56 None => Some((output, DefiniteError::TableDropped(table.to_string()))),
57 Some(schema) => {
58 let new_desc = (*schema).clone().to_desc(
59 Some(&BTreeSet::from_iter(
60 output.text_columns.iter().map(|s| s.as_str()),
61 )),
62 Some(&BTreeSet::from_iter(
63 output.exclude_columns.iter().map(|s| s.as_str()),
64 )),
65 );
66 match new_desc {
67 Ok(desc) => match output.desc.determine_compatibility(&desc) {
68 Ok(()) => None,
69 Err(err) => Some((
70 output,
71 DefiniteError::IncompatibleSchema(err.to_string()),
72 )),
73 },
74 Err(err) => {
75 Some((output, DefiniteError::IncompatibleSchema(err.to_string())))
76 }
77 }
78 }
79 })
80 })
81 .collect())
82}