mz_storage/source/mysql/schemas.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use mysql_async::prelude::Queryable;
13use mz_mysql_util::{MySqlError, SchemaRequest, schema_info};
14
15use super::{DefiniteError, MySqlTableName, SourceOutputInfo};
16
17/// Given a map of tables and the expected schemas of their outputs, retrieve the current
18/// schema for each and verify they are compatible with the expected schema.
19///
20/// Returns a vec of outputs that have incompatible schema changes.
21pub(super) async fn verify_schemas<'a, Q, I>(
22 conn: &mut Q,
23 expected: BTreeMap<&'a MySqlTableName, I>,
24) -> Result<Vec<(&'a SourceOutputInfo, DefiniteError)>, MySqlError>
25where
26 Q: Queryable,
27 I: IntoIterator<Item = &'a SourceOutputInfo>,
28{
29 let cur_schemas: BTreeMap<_, _> = schema_info(
30 conn,
31 &SchemaRequest::Tables(
32 expected
33 .iter()
34 .map(|(f, _)| (f.0.as_str(), f.1.as_str()))
35 .collect(),
36 ),
37 )
38 .await?
39 .into_iter()
40 .map(|schema| {
41 (
42 MySqlTableName::new(&schema.schema_name, &schema.name),
43 schema,
44 )
45 })
46 .collect();
47
48 Ok(expected
49 .into_iter()
50 .flat_map(|(table, outputs)| {
51 // For each output for this upstream table, verify that the existing output
52 // desc is compatible with the new desc.
53 outputs
54 .into_iter()
55 .filter_map(|output| match &cur_schemas.get(table) {
56 None => Some((output, DefiniteError::TableDropped(table.to_string()))),
57 Some(schema) => {
58 let new_desc = (*schema).clone().to_desc(
59 Some(&BTreeSet::from_iter(
60 output.text_columns.iter().map(|s| s.as_str()),
61 )),
62 Some(&BTreeSet::from_iter(
63 output.exclude_columns.iter().map(|s| s.as_str()),
64 )),
65 );
66 match new_desc {
67 Ok(desc) => {
68 match output
69 .desc
70 .determine_compatibility(&desc, output.binlog_full_metadata)
71 {
72 Ok(()) => None,
73 Err(err) => Some((
74 output,
75 DefiniteError::IncompatibleSchema(err.to_string()),
76 )),
77 }
78 }
79 Err(err) => {
80 Some((output, DefiniteError::IncompatibleSchema(err.to_string())))
81 }
82 }
83 }
84 })
85 })
86 .collect())
87}