1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use std::collections::{BTreeMap, BTreeSet};
1112use mysql_async::prelude::Queryable;
13use mz_mysql_util::{MySqlError, SchemaRequest, schema_info};
1415use super::{DefiniteError, MySqlTableName, SourceOutputInfo};
1617/// Given a map of tables and the expected schemas of their outputs, retrieve the current
18/// schema for each and verify they are compatible with the expected schema.
19///
20/// Returns a vec of outputs that have incompatible schema changes.
21pub(super) async fn verify_schemas<'a, Q, I>(
22 conn: &mut Q,
23 expected: BTreeMap<&'a MySqlTableName, I>,
24) -> Result<Vec<(&'a SourceOutputInfo, DefiniteError)>, MySqlError>
25where
26Q: Queryable,
27 I: IntoIterator<Item = &'a SourceOutputInfo>,
28{
29let cur_schemas: BTreeMap<_, _> = schema_info(
30 conn,
31&SchemaRequest::Tables(
32 expected
33 .iter()
34 .map(|(f, _)| (f.0.as_str(), f.1.as_str()))
35 .collect(),
36 ),
37 )
38 .await?
39.into_iter()
40 .map(|schema| {
41 (
42 MySqlTableName::new(&schema.schema_name, &schema.name),
43 schema,
44 )
45 })
46 .collect();
4748Ok(expected
49 .into_iter()
50 .flat_map(|(table, outputs)| {
51// For each output for this upstream table, verify that the existing output
52 // desc is compatible with the new desc.
53outputs
54 .into_iter()
55 .filter_map(|output| match &cur_schemas.get(table) {
56None => Some((output, DefiniteError::TableDropped(table.to_string()))),
57Some(schema) => {
58let new_desc = (*schema).clone().to_desc(
59Some(&BTreeSet::from_iter(
60 output.text_columns.iter().map(|s| s.as_str()),
61 )),
62Some(&BTreeSet::from_iter(
63 output.exclude_columns.iter().map(|s| s.as_str()),
64 )),
65 );
66match new_desc {
67Ok(desc) => match output.desc.determine_compatibility(&desc) {
68Ok(()) => None,
69Err(err) => Some((
70 output,
71 DefiniteError::IncompatibleSchema(err.to_string()),
72 )),
73 },
74Err(err) => {
75Some((output, DefiniteError::IncompatibleSchema(err.to_string())))
76 }
77 }
78 }
79 })
80 })
81 .collect())
82}