Skip to main content

mz_adapter/catalog/
migrate.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24    CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25    SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40// DO NOT add any more imports from `crate` outside of `crate::catalog`.
41use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
45
46/// Catalog key of the `migration_version` setting.
47///
48/// The `migration_version` tracks the version of the binary that last successfully performed and
49/// committed all the catalog migrations (including builtin schema migrations). It can be used by
50/// migration logic to identify the source version from which to migrate.
51///
52/// Note that the durable catalog also knows a `catalog_content_version`. That doesn't work for
53/// this purpose as it is already bumped to the current binary version when the catalog is opened
54/// in writable mode, before any migrations have run.
55const MIGRATION_VERSION_KEY: &str = "migration_version";
56
57pub(crate) fn get_migration_version(txn: &Transaction<'_>) -> Option<Version> {
58    txn.get_setting(MIGRATION_VERSION_KEY.into())
59        .map(|s| s.parse().expect("valid migration version"))
60}
61
62pub(crate) fn set_migration_version(
63    txn: &mut Transaction<'_>,
64    version: Version,
65) -> Result<(), mz_catalog::durable::CatalogError> {
66    txn.set_setting(MIGRATION_VERSION_KEY.into(), Some(version.to_string()))
67}
68
69fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
70where
71    F: for<'a> FnMut(
72        &'a mut Transaction<'_>,
73        CatalogItemId,
74        &'a mut Statement<Raw>,
75    ) -> Result<(), anyhow::Error>,
76{
77    let mut updated_items = BTreeMap::new();
78
79    for mut item in tx.get_items() {
80        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
81        f(tx, item.id, &mut stmt)?;
82
83        item.create_sql = stmt.to_ast_string_stable();
84
85        updated_items.insert(item.id, item);
86    }
87    tx.update_items(updated_items)?;
88    Ok(())
89}
90
91fn rewrite_items<F>(
92    tx: &mut Transaction<'_>,
93    cat: &ConnCatalog<'_>,
94    mut f: F,
95) -> Result<(), anyhow::Error>
96where
97    F: for<'a> FnMut(
98        &'a mut Transaction<'_>,
99        &'a &ConnCatalog<'_>,
100        CatalogItemId,
101        &'a mut Statement<Raw>,
102    ) -> Result<(), anyhow::Error>,
103{
104    let mut updated_items = BTreeMap::new();
105    let items = tx.get_items();
106    for mut item in items {
107        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
108
109        f(tx, &cat, item.id, &mut stmt)?;
110
111        item.create_sql = stmt.to_ast_string_stable();
112
113        updated_items.insert(item.id, item);
114    }
115    tx.update_items(updated_items)?;
116    Ok(())
117}
118
119pub(crate) struct MigrateResult {
120    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
121    pub(crate) catalog_updates: Vec<ParsedStateUpdate>,
122    pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
123}
124
125/// Migrates all user items and loads them into `state`.
126///
127/// Returns the builtin updates corresponding to all user items.
128pub(crate) async fn migrate(
129    state: &mut CatalogState,
130    tx: &mut Transaction<'_>,
131    local_expr_cache: &mut LocalExpressionCache,
132    item_updates: Vec<StateUpdate>,
133    _now: NowFn,
134    _boot_ts: Timestamp,
135) -> Result<MigrateResult, anyhow::Error> {
136    let catalog_version = get_migration_version(tx).unwrap_or(Version::new(0, 0, 0));
137
138    info!(
139        "migrating statements from catalog version {:?}",
140        catalog_version
141    );
142
143    rewrite_ast_items(tx, |tx, _id, stmt| {
144        // Add per-item AST migrations below.
145        //
146        // Each migration should be a function that takes `stmt` (the AST
147        // representing the creation SQL for the item) as input. Any
148        // mutations to `stmt` will be staged for commit to the catalog.
149        //
150        // Migration functions may also take `tx` as input to stage
151        // arbitrary changes to the catalog.
152        ast_rewrite_create_sink_partition_strategy(stmt)?;
153        ast_rewrite_sql_server_constraints(stmt)?;
154        ast_rewrite_add_missing_index_ids(tx, stmt)?;
155        Ok(())
156    })?;
157
158    // Load items into catalog. We make sure to consolidate the old updates with the new updates to
159    // avoid trying to apply unmigrated items.
160    let commit_ts = tx.upper();
161    let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
162    let op_item_updates = tx.get_and_commit_op_updates();
163    let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
164    item_updates.extend(op_item_updates);
165    differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
166
167    // Since some migrations might introduce non-item 'post-item' updates, we sequester those
168    // so they can be applied with other post-item updates after migrations to avoid
169    // accumulating negative diffs.
170    let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
171        .into_iter()
172        // The only post-item update kind we currently generate is to
173        // update storage collection metadata.
174        .partition(|(kind, _, _)| {
175            matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
176        });
177
178    let item_updates = item_updates
179        .into_iter()
180        .map(|(kind, ts, diff)| StateUpdate {
181            kind: kind.into(),
182            ts,
183            diff: diff.try_into().expect("valid diff"),
184        })
185        .collect();
186
187    let force_source_table_syntax = state.system_config().force_source_table_syntax();
188    // When this flag is set the legacy syntax is denied. But here we are about to perform a
189    // migration which requires that we parse the current catalog state. To proceed we temporarily disable
190    // the flag and then reset it after migrations are done.
191    if force_source_table_syntax {
192        state
193            .system_config_mut()
194            .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
195            .expect("known parameter");
196    }
197
198    let (mut ast_builtin_table_updates, mut ast_catalog_updates) =
199        state.apply_updates(item_updates, local_expr_cache).await;
200
201    info!("migrating from catalog version {:?}", catalog_version);
202
203    let conn_cat = state.for_system_session();
204
205    // Special block for `ast_rewrite_sources_to_tables` migration
206    // since it requires a feature flag needs to update multiple AST items at once.
207    if force_source_table_syntax {
208        rewrite_sources_to_tables(tx, &conn_cat)?;
209    }
210
211    rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
212        let _catalog_version = catalog_version.clone();
213        // Add per-item, post-planning AST migrations below. Most
214        // migrations should be in the above `rewrite_ast_items` block.
215        //
216        // Each migration should be a function that takes `item` (the AST
217        // representing the creation SQL for the item) as input. Any
218        // mutations to `item` will be staged for commit to the catalog.
219        //
220        // Be careful if you reference `conn_cat`. Doing so is *weird*,
221        // as you'll be rewriting the catalog while looking at it. If
222        // possible, make your migration independent of `conn_cat`, and only
223        // consider a single item at a time.
224        //
225        // Migration functions may also take `tx` as input to stage
226        // arbitrary changes to the catalog.
227        Ok(())
228    })?;
229
230    if force_source_table_syntax {
231        state
232            .system_config_mut()
233            .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
234            .expect("known parameter");
235    }
236
237    // Add whole-catalog migrations below.
238    //
239    // Each migration should be a function that takes `tx` and `conn_cat` as
240    // input and stages arbitrary transformations to the catalog on `tx`.
241
242    let op_item_updates = tx.get_and_commit_op_updates();
243    let (item_builtin_table_updates, item_catalog_updates) =
244        state.apply_updates(op_item_updates, local_expr_cache).await;
245
246    ast_builtin_table_updates.extend(item_builtin_table_updates);
247    ast_catalog_updates.extend(item_catalog_updates);
248
249    info!(
250        "migration from catalog version {:?} complete",
251        catalog_version
252    );
253
254    Ok(MigrateResult {
255        builtin_table_updates: ast_builtin_table_updates,
256        catalog_updates: ast_catalog_updates,
257        post_item_updates,
258    })
259}
260
261// Add new migrations below their appropriate heading, and precede them with a
262// short summary of the migration's purpose and optional additional commentary
263// about safety or approach.
264//
265// The convention is to name the migration function using snake case:
266// > <category>_<description>_<version>
267//
268// Please include the adapter team on any code reviews that add or edit
269// migrations.
270
271/// Migrates all sources to use the new sources as tables model
272///
273/// Suppose we have an old-style source named `source_name` with global id `source_id`. The source
274/// will also have an associated progress source named `progress_name` (which is almost always
275/// `source_name` + "_progress") with global id `progress_id`.
276///
277/// We have two constraints to satisfy. The migration:
278///   1. should not change the schema of a global id *if that global id maps to a
279///      durable collection*. The reason for this constraint is that when a durable collection (i.e
280///      backed by a persist shard) is opened persist will verify that the schema is the expected
281///      one. If we change the Create SQL of a global id to a non-durable definition (e.g a view)
282///      then we are free to also change the schema.
283///   2. should make it such that the SQL object that is constructed with a new-style `CREATE
284///      SOURCE` statement contains the progress data and all other objects related to the
285///      old-style source depend on that object.
286///
287/// With these constraints we consider two cases.
288///
289/// ## Case 1: A multi-output source
290///
291/// Multi-output sources have a dummy output as the contents of `source_name` that is useless. So
292/// we re-purpose that name to be the `CREATE SOURCE` statement and make `progress_name` be a view
293/// of `source_name`. Since the main source is a durable object we must move `source_name` and the
294/// corresponding new-style `CREATE SOURCE` statement under `progress_id`. Then `progress_name` can
295/// move to `source_id` and since it becomes a view we are free to change its schema.
296///
297/// Visually, we are changing this mapping:
298///
299/// |  Global ID  |  SQL Name     | Create SQL                 | Schema   | Durable |
300/// +-------------+---------------+----------------------------+----------+---------|
301/// | source_id   | source_name   | CREATE SOURCE (old-style)  | empty    | yes     |
302/// | progress_id | progress_name | CREATE SUBSOURCE .."       | progress | yes     |
303///
304/// to this mapping:
305///
306/// |  Global ID  |  SQL Name     | Create SQL                | Schema        | Durable |
307/// +-------------+---------------+---------------------------+---------------+---------+
308/// | source_id   | progress_name | CREATE VIEW               | progress data | no      |
309/// | progress_id | source_name   | CREATE SOURCE (new-style) | progress data | yes     |
310///
311/// ## Case 2: A single-output source
312///
313/// Single-output sources have data as the contents of `source_name` and so we can't repurpose that
314/// name to be the `CREATE SOURCE` statement. Here we leave everything intact except for the
315/// Create SQL of each object. Namely, the old-style `CREATE SOURCE` statement becomes a `CREATE
316/// TABLE FROM SOURCE` and the old-style `CREATE SUBSOURCE .. PROGRESS` becomes a new-style `CREATE
317/// SOURCE` statement.
318///
319/// Visually, we are changing this mapping:
320///
321/// |  Global ID  |  SQL Name     | Create SQL                 | Schema      | Durable |
322/// +-------------+---------------+----------------------------+-------------+---------|
323/// | source_id   | source_name   | CREATE SOURCE (old-style)  | source data | yes     |
324/// | progress_id | progress_name | CREATE SUBSOURCE .."       | progress    | yes     |
325///
326/// to this mapping:
327///
328/// |  Global ID  |  SQL Name     | Create SQL                 | Schema      | Durable |
329/// +-------------+---------------+----------------------------+-------------+---------|
330/// | source_id   | source_name   | CREATE TABLE FROM SOURCE   | source data | yes     |
331/// | progress_id | progress_name | CREATE SOURCE (new-style)  | progress    | yes     |
332///
333/// ## Subsource migration
334///
335/// After the migration goes over all the `CREATE SOURCE` statements it then transforms each
336/// non-progress `CREATE SUBSOURCE` statement to be a `CREATE TABLE FROM SOURCE` statement that
337/// points to the original `source_name` but with the altered global id (which is now
338/// `progress_id`).
339fn rewrite_sources_to_tables(
340    tx: &mut Transaction<'_>,
341    catalog: &ConnCatalog<'_>,
342) -> Result<(), anyhow::Error> {
343    use mz_sql::ast::{
344        CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
345        CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
346        KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
347        RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
348        UnresolvedItemName, Value, WithOptionValue,
349    };
350
351    let mut updated_items = BTreeMap::new();
352
353    let mut sources = vec![];
354    let mut subsources = vec![];
355
356    for item in tx.get_items() {
357        let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
358        match stmt {
359            Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
360            Statement::CreateSource(stmt) => sources.push((item, stmt)),
361            _ => {}
362        }
363    }
364
365    let mut pending_progress_items = BTreeMap::new();
366    let mut migrated_source_ids = BTreeMap::new();
367    // We first go over the sources, which depending on the kind determine what happens with the
368    // progress statements.
369    for (mut source_item, source_stmt) in sources {
370        let CreateSourceStatement {
371            name,
372            in_cluster,
373            col_names,
374            mut connection,
375            include_metadata,
376            format,
377            envelope,
378            if_not_exists,
379            key_constraint,
380            with_options,
381            external_references,
382            progress_subsource,
383        } = source_stmt;
384
385        let (progress_name, progress_item) = match progress_subsource {
386            Some(DeferredItemName::Named(RawItemName::Name(name))) => {
387                let partial_name = normalize::unresolved_item_name(name.clone())?;
388                (name, catalog.resolve_item(&partial_name)?)
389            }
390            Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
391                let gid = id.parse()?;
392                (name, catalog.get_item(&gid))
393            }
394            Some(DeferredItemName::Deferred(_)) => {
395                unreachable!("invalid progress subsource")
396            }
397            None => {
398                info!("migrate: skipping already migrated source: {name}");
399                continue;
400            }
401        };
402        let raw_progress_name =
403            RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
404
405        // We need to jump through some hoops to get to the raw item name of the source
406        let catalog_item = catalog.get_item(&source_item.id);
407        let source_name: &QualifiedItemName = catalog_item.name();
408        let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
409        let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
410
411        // First, strip the connection options that we no longer need
412        match &mut connection {
413            CreateSourceConnection::Postgres { options, .. } => {
414                options.retain(|o| match o.name {
415                    PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
416                    PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
417                });
418            }
419            CreateSourceConnection::SqlServer { options, .. } => {
420                options.retain(|o| match o.name {
421                    SqlServerConfigOptionName::Details => true,
422                    SqlServerConfigOptionName::TextColumns
423                    | SqlServerConfigOptionName::ExcludeColumns => false,
424                });
425            }
426            CreateSourceConnection::MySql { options, .. } => {
427                options.retain(|o| match o.name {
428                    MySqlConfigOptionName::Details => true,
429                    MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
430                        false
431                    }
432                });
433            }
434            CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
435            }
436        }
437
438        // Then, figure out the new statements for the progress and source.
439        let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
440            match connection {
441                connection @ (CreateSourceConnection::Postgres { .. }
442                | CreateSourceConnection::MySql { .. }
443                | CreateSourceConnection::SqlServer { .. }
444                | CreateSourceConnection::LoadGenerator {
445                    generator:
446                        LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
447                    ..
448                }) => {
449                    // Assert the expected state of the source
450                    assert_eq!(col_names, &[]);
451                    assert_eq!(key_constraint, None);
452                    assert_eq!(format, None);
453                    assert_eq!(envelope, None);
454                    assert_eq!(include_metadata, &[]);
455                    assert_eq!(external_references, None);
456
457                    // This is a dummy replacement statement for the source object of multi-output
458                    // sources. It is describing the query `TABLE source_name`. This ensures that
459                    // whoever was used to run select queries against the `source_name` + "_progress"
460                    // object still gets the same data after the migration. This switch does
461                    // changes the schema of the object with `source_item.id` but because we're turning
462                    // it into a view, which is not durable, it's ok. We'll never open a persist shard
463                    // for this global id anymore.
464                    let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
465                        if_exists: IfExistsBehavior::Error,
466                        temporary: false,
467                        definition: ViewDefinition {
468                            name: progress_name,
469                            columns: vec![],
470                            query: Query {
471                                ctes: CteBlock::Simple(vec![]),
472                                body: SetExpr::Table(RawItemName::Id(
473                                    progress_item.id().to_string(),
474                                    source_name.clone(),
475                                    None,
476                                )),
477                                order_by: vec![],
478                                limit: None,
479                                offset: None,
480                            },
481                        },
482                    });
483
484                    let new_progress_stmt = CreateSourceStatement {
485                        name: source_name.clone(),
486                        in_cluster,
487                        col_names: vec![],
488                        connection,
489                        include_metadata: vec![],
490                        format: None,
491                        envelope: None,
492                        if_not_exists,
493                        key_constraint: None,
494                        with_options,
495                        external_references: None,
496                        progress_subsource: None,
497                    };
498
499                    migrated_source_ids.insert(source_item.id, progress_item.id());
500
501                    (
502                        full_source_name.item,
503                        new_progress_stmt,
504                        progress_item.name().item.clone(),
505                        dummy_source_stmt,
506                    )
507                }
508                CreateSourceConnection::Kafka {
509                    options,
510                    connection,
511                } => {
512                    let constraints = if let Some(_key_constraint) = key_constraint {
513                        // Primary key not enforced is not enabled for anyone
514                        // TODO: remove the feature altogether
515                        vec![]
516                    } else {
517                        vec![]
518                    };
519
520                    let columns = if col_names.is_empty() {
521                        TableFromSourceColumns::NotSpecified
522                    } else {
523                        TableFromSourceColumns::Named(col_names)
524                    };
525
526                    // All source tables must have a `details` option, which is a serialized proto
527                    // describing any source-specific details for this table statement.
528                    let details = SourceExportStatementDetails::Kafka {};
529                    let table_with_options = vec![TableFromSourceOption {
530                        name: TableFromSourceOptionName::Details,
531                        value: Some(WithOptionValue::Value(Value::String(hex::encode(
532                            details.into_proto().encode_to_vec(),
533                        )))),
534                    }];
535                    // The external reference for a kafka source is the just the topic name
536                    let topic_option = options
537                        .iter()
538                        .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
539                        .expect("kafka sources must have a topic");
540                    let topic = match &topic_option.value {
541                        Some(WithOptionValue::Value(Value::String(topic))) => topic,
542                        _ => unreachable!("topic must be a string"),
543                    };
544                    let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
545
546                    let new_source_stmt =
547                        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
548                            name: source_name,
549                            constraints,
550                            columns,
551                            if_not_exists,
552                            source: raw_progress_name,
553                            include_metadata,
554                            format,
555                            envelope,
556                            external_reference: Some(external_reference),
557                            with_options: table_with_options,
558                        });
559
560                    let new_progress_stmt = CreateSourceStatement {
561                        name: progress_name,
562                        in_cluster,
563                        col_names: vec![],
564                        connection: CreateSourceConnection::Kafka {
565                            options,
566                            connection,
567                        },
568                        include_metadata: vec![],
569                        format: None,
570                        envelope: None,
571                        if_not_exists,
572                        key_constraint: None,
573                        with_options,
574                        external_references: None,
575                        progress_subsource: None,
576                    };
577                    (
578                        progress_item.name().item.clone(),
579                        new_progress_stmt,
580                        full_source_name.item,
581                        new_source_stmt,
582                    )
583                }
584                CreateSourceConnection::LoadGenerator {
585                    generator:
586                        generator @ (LoadGenerator::Clock
587                        | LoadGenerator::Counter
588                        | LoadGenerator::Datums
589                        | LoadGenerator::KeyValue),
590                    options,
591                } => {
592                    let constraints = if let Some(_key_constraint) = key_constraint {
593                        // Should we ignore not enforced primary key constraints here?
594                        vec![]
595                    } else {
596                        vec![]
597                    };
598
599                    let columns = if col_names.is_empty() {
600                        TableFromSourceColumns::NotSpecified
601                    } else {
602                        TableFromSourceColumns::Named(col_names)
603                    };
604
605                    // All source tables must have a `details` option, which is a serialized proto
606                    // describing any source-specific details for this table statement.
607                    let details = SourceExportStatementDetails::LoadGenerator {
608                        output: LoadGeneratorOutput::Default,
609                    };
610                    let table_with_options = vec![TableFromSourceOption {
611                        name: TableFromSourceOptionName::Details,
612                        value: Some(WithOptionValue::Value(Value::String(hex::encode(
613                            details.into_proto().encode_to_vec(),
614                        )))),
615                    }];
616                    // Since these load generators are single-output the external reference
617                    // uses the schema-name for both namespace and name.
618                    let external_reference = FullItemName {
619                        database: mz_sql::names::RawDatabaseSpecifier::Name(
620                            mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
621                                .to_owned(),
622                        ),
623                        schema: generator.schema_name().to_string(),
624                        item: generator.schema_name().to_string(),
625                    };
626
627                    let new_source_stmt =
628                        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
629                            name: source_name,
630                            constraints,
631                            columns,
632                            if_not_exists,
633                            source: raw_progress_name,
634                            include_metadata,
635                            format,
636                            envelope,
637                            external_reference: Some(external_reference.into()),
638                            with_options: table_with_options,
639                        });
640
641                    let new_progress_stmt = CreateSourceStatement {
642                        name: progress_name,
643                        in_cluster,
644                        col_names: vec![],
645                        connection: CreateSourceConnection::LoadGenerator { generator, options },
646                        include_metadata: vec![],
647                        format: None,
648                        envelope: None,
649                        if_not_exists,
650                        key_constraint: None,
651                        with_options,
652                        external_references: None,
653                        progress_subsource: None,
654                    };
655                    (
656                        progress_item.name().item.clone(),
657                        new_progress_stmt,
658                        full_source_name.item,
659                        new_source_stmt,
660                    )
661                }
662            };
663
664        // The source can be updated right away but the replacement progress statement will
665        // be installed in the next loop where we go over subsources.
666
667        info!(
668            "migrate: converted source {} to {}",
669            source_item.create_sql, new_source_stmt
670        );
671        source_item.name = new_source_name.clone();
672        source_item.create_sql = new_source_stmt.to_ast_string_stable();
673        updated_items.insert(source_item.id, source_item);
674        pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
675    }
676
677    for (mut item, stmt) in subsources {
678        match stmt {
679            // Migrate progress statements to the corresponding statement produced from the
680            // previous step.
681            CreateSubsourceStatement {
682                of_source: None, ..
683            } => {
684                let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
685                    panic!("encountered orphan progress subsource id: {}", item.id)
686                };
687                item.name = new_name;
688                item.create_sql = new_stmt.to_ast_string_stable();
689                updated_items.insert(item.id, item);
690            }
691            // Migrate each `CREATE SUBSOURCE` statement to an equivalent
692            // `CREATE TABLE ... FROM SOURCE` statement.
693            CreateSubsourceStatement {
694                name,
695                columns,
696                constraints,
697                of_source: Some(raw_source_name),
698                if_not_exists,
699                mut with_options,
700            } => {
701                let new_raw_source_name = match raw_source_name {
702                    RawItemName::Id(old_id, name, None) => {
703                        let old_id: CatalogItemId = old_id.parse().expect("well formed");
704                        let new_id = migrated_source_ids[&old_id].clone();
705                        RawItemName::Id(new_id.to_string(), name, None)
706                    }
707                    _ => unreachable!("unexpected source name: {raw_source_name}"),
708                };
709                // The external reference is a `with_option` on subsource statements but is a
710                // separate field on table statements.
711                let external_reference = match with_options
712                    .iter()
713                    .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
714                {
715                    Some(i) => match with_options.remove(i).value {
716                        Some(WithOptionValue::UnresolvedItemName(name)) => name,
717                        _ => unreachable!("external reference must be an unresolved item name"),
718                    },
719                    None => panic!("subsource must have an external reference"),
720                };
721
722                let with_options = with_options
723                    .into_iter()
724                    .map(|option| {
725                        match option.name {
726                            CreateSubsourceOptionName::Details => TableFromSourceOption {
727                                name: TableFromSourceOptionName::Details,
728                                // The `details` option on both subsources and tables is identical, using the same
729                                // ProtoSourceExportStatementDetails serialized value.
730                                value: option.value,
731                            },
732                            CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
733                                name: TableFromSourceOptionName::TextColumns,
734                                value: option.value,
735                            },
736                            CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
737                                name: TableFromSourceOptionName::ExcludeColumns,
738                                value: option.value,
739                            },
740                            CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
741                                name: TableFromSourceOptionName::RetainHistory,
742                                value: option.value,
743                            },
744                            CreateSubsourceOptionName::Progress => {
745                                panic!("progress option should not exist on this subsource")
746                            }
747                            CreateSubsourceOptionName::ExternalReference => {
748                                unreachable!("This option is handled separately above.")
749                            }
750                        }
751                    })
752                    .collect::<Vec<_>>();
753
754                let table = CreateTableFromSourceStatement {
755                    name,
756                    constraints,
757                    columns: TableFromSourceColumns::Defined(columns),
758                    if_not_exists,
759                    source: new_raw_source_name,
760                    external_reference: Some(external_reference),
761                    with_options,
762                    // Subsources don't have `envelope`, `include_metadata`, or `format` options.
763                    envelope: None,
764                    include_metadata: vec![],
765                    format: None,
766                };
767
768                info!(
769                    "migrate: converted subsource {} to table {}",
770                    item.create_sql, table
771                );
772                item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
773                updated_items.insert(item.id, item);
774            }
775        }
776    }
777    assert!(
778        pending_progress_items.is_empty(),
779        "unexpected residual progress items: {pending_progress_items:?}"
780    );
781
782    tx.update_items(updated_items)?;
783
784    Ok(())
785}
786
787// Durable migrations
788
789/// Migrations that run only on the durable catalog before any data is loaded into memory.
790pub(crate) fn durable_migrate(
791    tx: &mut Transaction,
792    _organization_id: Uuid,
793    _boot_ts: Timestamp,
794) -> Result<(), anyhow::Error> {
795    // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
796    // binary version instead of the deploy generation.
797    const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
798    const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
799    if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
800        if let Some(shard_id) = tx.get_expression_cache_shard() {
801            tx.mark_shards_as_finalized(btreeset! {shard_id});
802            tx.set_expression_cache_shard(ShardId::new())?;
803        }
804        tx.set_config(
805            EXPR_CACHE_MIGRATION_KEY.to_string(),
806            Some(EXPR_CACHE_MIGRATION_DONE),
807        )?;
808    }
809
810    // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
811    // binary version instead of the deploy generation.
812    const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
813    const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
814    if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
815        != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
816    {
817        if let Some(shard_id) = tx.get_builtin_migration_shard() {
818            tx.mark_shards_as_finalized(btreeset! {shard_id});
819            tx.set_builtin_migration_shard(ShardId::new())?;
820        }
821        tx.set_config(
822            BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
823            Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
824        )?;
825    }
826
827    if tx
828        .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
829        .is_none()
830    {
831        let mut nonce = [0u8; 24];
832        let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
833        let nonce = BASE64_STANDARD.encode(nonce);
834        tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
835    }
836
837    Ok(())
838}
839
840// Add new migrations below their appropriate heading, and precede them with a
841// short summary of the migration's purpose and optional additional commentary
842// about safety or approach.
843//
844// The convention is to name the migration function using snake case:
845// > <category>_<description>_<version>
846//
847// Please include the adapter team on any code reviews that add or edit
848// migrations.
849
850// Remove PARTITION STRATEGY from CREATE SINK statements.
851fn ast_rewrite_create_sink_partition_strategy(
852    stmt: &mut Statement<Raw>,
853) -> Result<(), anyhow::Error> {
854    let Statement::CreateSink(stmt) = stmt else {
855        return Ok(());
856    };
857    stmt.with_options
858        .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
859    Ok(())
860}
861
862// Migrate SQL Server constraint information from the columns to dedicated constraints field.
863fn ast_rewrite_sql_server_constraints(stmt: &mut Statement<Raw>) -> Result<(), anyhow::Error> {
864    use mz_sql::ast::{
865        CreateSubsourceOptionName, TableFromSourceOptionName, Value, WithOptionValue,
866    };
867    use mz_sql_server_util::desc::{SqlServerTableConstraint, SqlServerTableConstraintType};
868    use mz_storage_types::sources::ProtoSourceExportStatementDetails;
869    use mz_storage_types::sources::proto_source_export_statement_details::Kind;
870
871    let deets: Option<&mut String> = match stmt {
872        Statement::CreateSubsource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
873            if matches!(option.name, CreateSubsourceOptionName::Details)
874                && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
875            {
876                Some(details)
877            } else {
878                None
879            }
880        }),
881        Statement::CreateTableFromSource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
882            if matches!(option.name, TableFromSourceOptionName::Details)
883                && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
884            {
885                Some(details)
886            } else {
887                None
888            }
889        }),
890        _ => None,
891    };
892    let Some(deets) = deets else {
893        return Ok(());
894    };
895
896    let current_value = hex::decode(&mut *deets)?;
897    let current_value = ProtoSourceExportStatementDetails::decode(&*current_value)?;
898
899    // avoid further work if this isn't SQL Server
900    if !matches!(current_value.kind, Some(Kind::SqlServer(_))) {
901        return Ok(());
902    };
903
904    let SourceExportStatementDetails::SqlServer {
905        mut table,
906        capture_instance,
907        initial_lsn,
908    } = SourceExportStatementDetails::from_proto(current_value)?
909    else {
910        unreachable!("statement details must exist for SQL Server");
911    };
912
913    // Migration has already occured or did not need to happen.
914    if !table.constraints.is_empty() {
915        return Ok(());
916    }
917
918    // Relocates the primary key constraint information from the individual columns to the
919    // constraints field. This ensures that the columns no longer hold constraint information.
920    let mut migrated_constraints: BTreeMap<_, Vec<_>> = BTreeMap::new();
921    for col in table.columns.iter_mut() {
922        if let Some(constraint_name) = col.primary_key_constraint.take() {
923            migrated_constraints
924                .entry(constraint_name)
925                .or_default()
926                .push(col.name.to_string());
927        }
928    }
929
930    table.constraints = migrated_constraints
931        .into_iter()
932        .map(|(constraint_name, column_names)| SqlServerTableConstraint {
933            constraint_name: constraint_name.to_string(),
934            constraint_type: SqlServerTableConstraintType::PrimaryKey,
935            column_names,
936        })
937        .collect();
938
939    let new_value = SourceExportStatementDetails::SqlServer {
940        table,
941        capture_instance,
942        initial_lsn,
943    };
944    *deets = hex::encode(new_value.into_proto().encode_to_vec());
945
946    Ok(())
947}
948
949/// Add missing item IDs to the ON clauses of CREATE INDEX statements.
950fn ast_rewrite_add_missing_index_ids(
951    tx: &Transaction<'_>,
952    stmt: &mut Statement<Raw>,
953) -> Result<(), anyhow::Error> {
954    let Statement::CreateIndex(stmt) = stmt else {
955        return Ok(());
956    };
957
958    let unresolved_name = match stmt.on_name.clone() {
959        mz_sql::ast::RawItemName::Name(name) => name,
960        // ID already present; nothing to do.
961        mz_sql::ast::RawItemName::Id(..) => return Ok(()),
962    };
963
964    let parts = &unresolved_name.0;
965    let (db_name, schema_name, item_name) = match parts.len() {
966        3 => (Some(&parts[0]), &parts[1], &parts[2]),
967        2 => (None, &parts[0], &parts[1]),
968        _ => panic!("invalid unresolved name: {unresolved_name:?}"),
969    };
970
971    let db_id = db_name.map(|x| {
972        let db = tx.get_databases().find(|db| db.name == x.as_str());
973        let db = db.unwrap_or_else(|| panic!("missing database: {x}"));
974        db.id
975    });
976    let schema_id = {
977        let schema = tx
978            .get_schemas()
979            .find(|s| s.name == schema_name.as_str() && s.database_id == db_id);
980        let schema = schema.unwrap_or_else(|| panic!("missing schema: {schema_name}, {db_id:?}"));
981        schema.id
982    };
983    let item_id = {
984        let item = tx
985            .get_items()
986            .find(|i| i.name == item_name.as_str() && i.schema_id == schema_id);
987        let item = item.unwrap_or_else(|| panic!("missing item: {item_name}, {schema_id:?}"));
988        item.id
989    };
990
991    stmt.on_name = mz_sql::ast::RawItemName::Id(item_id.to_string(), unresolved_name, None);
992
993    Ok(())
994}