mz_storage/source/mysql/
schemas.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use mysql_async::prelude::Queryable;
13use mz_mysql_util::{MySqlError, SchemaRequest, schema_info};
14
15use super::{DefiniteError, MySqlTableName, SourceOutputInfo};
16
17/// Given a map of tables and the expected schemas of their outputs, retrieve the current
18/// schema for each and verify they are compatible with the expected schema.
19///
20/// Returns a vec of outputs that have incompatible schema changes.
21pub(super) async fn verify_schemas<'a, Q, I>(
22    conn: &mut Q,
23    expected: BTreeMap<&'a MySqlTableName, I>,
24) -> Result<Vec<(&'a SourceOutputInfo, DefiniteError)>, MySqlError>
25where
26    Q: Queryable,
27    I: IntoIterator<Item = &'a SourceOutputInfo>,
28{
29    let cur_schemas: BTreeMap<_, _> = schema_info(
30        conn,
31        &SchemaRequest::Tables(
32            expected
33                .iter()
34                .map(|(f, _)| (f.0.as_str(), f.1.as_str()))
35                .collect(),
36        ),
37    )
38    .await?
39    .into_iter()
40    .map(|schema| {
41        (
42            MySqlTableName::new(&schema.schema_name, &schema.name),
43            schema,
44        )
45    })
46    .collect();
47
48    Ok(expected
49        .into_iter()
50        .flat_map(|(table, outputs)| {
51            // For each output for this upstream table, verify that the existing output
52            // desc is compatible with the new desc.
53            outputs
54                .into_iter()
55                .filter_map(|output| match &cur_schemas.get(table) {
56                    None => Some((output, DefiniteError::TableDropped(table.to_string()))),
57                    Some(schema) => {
58                        let new_desc = (*schema).clone().to_desc(
59                            Some(&BTreeSet::from_iter(
60                                output.text_columns.iter().map(|s| s.as_str()),
61                            )),
62                            Some(&BTreeSet::from_iter(
63                                output.exclude_columns.iter().map(|s| s.as_str()),
64                            )),
65                        );
66                        match new_desc {
67                            Ok(desc) => match output.desc.determine_compatibility(&desc) {
68                                Ok(()) => None,
69                                Err(err) => Some((
70                                    output,
71                                    DefiniteError::IncompatibleSchema(err.to_string()),
72                                )),
73                            },
74                            Err(err) => {
75                                Some((output, DefiniteError::IncompatibleSchema(err.to_string())))
76                            }
77                        }
78                    }
79                })
80        })
81        .collect())
82}