mz_adapter/catalog/
migrate.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreeset;
13use mz_catalog::builtin::BuiltinTable;
14use mz_catalog::durable::Transaction;
15use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
16use mz_ore::collections::CollectionExt;
17use mz_ore::now::NowFn;
18use mz_persist_types::ShardId;
19use mz_repr::{CatalogItemId, Diff, Timestamp};
20use mz_sql::ast::CreateSinkOptionName;
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::names::FullItemName;
23use mz_sql_parser::ast::{IdentError, Raw, Statement};
24use mz_storage_client::controller::StorageTxn;
25use semver::Version;
26use tracing::info;
27use uuid::Uuid;
28
29// DO NOT add any more imports from `crate` outside of `crate::catalog`.
30use crate::catalog::open::into_consolidatable_updates_startup;
31use crate::catalog::state::LocalExpressionCache;
32use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
33
34fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
35where
36    F: for<'a> FnMut(
37        &'a mut Transaction<'_>,
38        CatalogItemId,
39        &'a mut Statement<Raw>,
40    ) -> Result<(), anyhow::Error>,
41{
42    let mut updated_items = BTreeMap::new();
43
44    for mut item in tx.get_items() {
45        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
46        f(tx, item.id, &mut stmt)?;
47
48        item.create_sql = stmt.to_ast_string_stable();
49
50        updated_items.insert(item.id, item);
51    }
52    tx.update_items(updated_items)?;
53    Ok(())
54}
55
56fn rewrite_items<F>(
57    tx: &mut Transaction<'_>,
58    cat: &ConnCatalog<'_>,
59    mut f: F,
60) -> Result<(), anyhow::Error>
61where
62    F: for<'a> FnMut(
63        &'a mut Transaction<'_>,
64        &'a &ConnCatalog<'_>,
65        CatalogItemId,
66        &'a mut Statement<Raw>,
67    ) -> Result<(), anyhow::Error>,
68{
69    let mut updated_items = BTreeMap::new();
70    let items = tx.get_items();
71    for mut item in items {
72        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
73
74        f(tx, &cat, item.id, &mut stmt)?;
75
76        item.create_sql = stmt.to_ast_string_stable();
77
78        updated_items.insert(item.id, item);
79    }
80    tx.update_items(updated_items)?;
81    Ok(())
82}
83
84pub(crate) struct MigrateResult {
85    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
86    pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
87}
88
89/// Migrates all user items and loads them into `state`.
90///
91/// Returns the builtin updates corresponding to all user items.
92pub(crate) async fn migrate(
93    state: &mut CatalogState,
94    tx: &mut Transaction<'_>,
95    local_expr_cache: &mut LocalExpressionCache,
96    item_updates: Vec<StateUpdate>,
97    now: NowFn,
98    _boot_ts: Timestamp,
99) -> Result<MigrateResult, anyhow::Error> {
100    let catalog_version = tx.get_catalog_content_version();
101    let catalog_version = match catalog_version {
102        Some(v) => Version::parse(v)?,
103        None => Version::new(0, 0, 0),
104    };
105
106    info!(
107        "migrating statements from catalog version {:?}",
108        catalog_version
109    );
110
111    // Special block for `ast_rewrite_sources_to_tables` migration
112    // since it requires a feature flag needs to update multiple AST items at once.
113    if state.system_config().force_source_table_syntax() {
114        ast_rewrite_sources_to_tables(tx, now)?;
115    }
116
117    rewrite_ast_items(tx, |_tx, _id, stmt| {
118        // Add per-item AST migrations below.
119        //
120        // Each migration should be a function that takes `stmt` (the AST
121        // representing the creation SQL for the item) as input. Any
122        // mutations to `stmt` will be staged for commit to the catalog.
123        //
124        // Migration functions may also take `tx` as input to stage
125        // arbitrary changes to the catalog.
126        ast_rewrite_create_sink_partition_strategy(stmt)?;
127        Ok(())
128    })?;
129
130    // Load items into catalog. We make sure to consolidate the old updates with the new updates to
131    // avoid trying to apply unmigrated items.
132    let commit_ts = tx.upper();
133    let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
134    let op_item_updates = tx.get_and_commit_op_updates();
135    let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
136    item_updates.extend(op_item_updates);
137    differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
138
139    // Since some migrations might introduce non-item 'post-item' updates, we sequester those
140    // so they can be applied with other post-item updates after migrations to avoid
141    // accumulating negative diffs.
142    let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
143        .into_iter()
144        // The only post-item update kind we currently generate is to
145        // update storage collection metadata.
146        .partition(|(kind, _, _)| {
147            matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
148        });
149
150    let item_updates = item_updates
151        .into_iter()
152        .map(|(kind, ts, diff)| StateUpdate {
153            kind: kind.into(),
154            ts,
155            diff: diff.try_into().expect("valid diff"),
156        })
157        .collect();
158    let mut ast_builtin_table_updates = state
159        .apply_updates_for_bootstrap(item_updates, local_expr_cache)
160        .await;
161
162    info!("migrating from catalog version {:?}", catalog_version);
163
164    let conn_cat = state.for_system_session();
165
166    rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
167        let _catalog_version = catalog_version.clone();
168        // Add per-item, post-planning AST migrations below. Most
169        // migrations should be in the above `rewrite_ast_items` block.
170        //
171        // Each migration should be a function that takes `item` (the AST
172        // representing the creation SQL for the item) as input. Any
173        // mutations to `item` will be staged for commit to the catalog.
174        //
175        // Be careful if you reference `conn_cat`. Doing so is *weird*,
176        // as you'll be rewriting the catalog while looking at it. If
177        // possible, make your migration independent of `conn_cat`, and only
178        // consider a single item at a time.
179        //
180        // Migration functions may also take `tx` as input to stage
181        // arbitrary changes to the catalog.
182        Ok(())
183    })?;
184
185    // Add whole-catalog migrations below.
186    //
187    // Each migration should be a function that takes `tx` and `conn_cat` as
188    // input and stages arbitrary transformations to the catalog on `tx`.
189
190    let op_item_updates = tx.get_and_commit_op_updates();
191    let item_builtin_table_updates = state
192        .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
193        .await;
194
195    ast_builtin_table_updates.extend(item_builtin_table_updates);
196
197    info!(
198        "migration from catalog version {:?} complete",
199        catalog_version
200    );
201    Ok(MigrateResult {
202        builtin_table_updates: ast_builtin_table_updates,
203        post_item_updates,
204    })
205}
206
207// Add new migrations below their appropriate heading, and precede them with a
208// short summary of the migration's purpose and optional additional commentary
209// about safety or approach.
210//
211// The convention is to name the migration function using snake case:
212// > <category>_<description>_<version>
213//
214// Please include the adapter team on any code reviews that add or edit
215// migrations.
216
217/// Migrates all sources to use the new sources as tables model
218///
219/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into
220/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres,
221/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources.
222///
223/// Second we migrate existing `CREATE SOURCE` statements for these multi-output
224/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS).
225///
226/// Third we migrate existing single-output `CREATE SOURCE` statements.
227/// This includes existing Kafka and single-output load-generator
228/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE`
229/// statement that copies over all the export-specific options. This table will use
230/// to the existing source statement's persist shard but use a new GlobalID.
231/// The original source statement will be updated to remove the export-specific options,
232/// renamed to `<original_name>_source`, and use a new empty shard while keeping its
233/// same GlobalId.
234///
235fn ast_rewrite_sources_to_tables(
236    tx: &mut Transaction<'_>,
237    now: NowFn,
238) -> Result<(), anyhow::Error> {
239    use maplit::btreemap;
240    use maplit::btreeset;
241    use mz_persist_types::ShardId;
242    use mz_proto::RustType;
243    use mz_sql::ast::{
244        CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
245        CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
246        KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
247        RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
248        UnresolvedItemName, Value, WithOptionValue,
249    };
250    use mz_storage_client::controller::StorageTxn;
251    use mz_storage_types::sources::SourceExportStatementDetails;
252    use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
253    use prost::Message;
254
255    let items_with_statements = tx
256        .get_items()
257        .map(|item| {
258            let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
259            Ok((item, stmt))
260        })
261        .collect::<Result<Vec<_>, anyhow::Error>>()?;
262    let items_with_statements_copied = items_with_statements.clone();
263
264    let item_names_per_schema = items_with_statements_copied
265        .iter()
266        .map(|(item, _)| (item.schema_id.clone(), &item.name))
267        .fold(BTreeMap::new(), |mut acc, (schema_id, name)| {
268            acc.entry(schema_id)
269                .or_insert_with(|| btreeset! {})
270                .insert(name);
271            acc
272        });
273
274    // Any CatalogItemId that should be changed to a new CatalogItemId in any statements that
275    // reference it. This is necessary for ensuring downstream statements (e.g.
276    // mat views, indexes) that reference a single-output source (e.g. kafka)
277    // will now reference the corresponding new table, with the same data, instead.
278    let mut changed_ids = BTreeMap::new();
279
280    for (mut item, stmt) in items_with_statements {
281        match stmt {
282            // Migrate each `CREATE SUBSOURCE` statement to an equivalent
283            // `CREATE TABLE ... FROM SOURCE` statement.
284            Statement::CreateSubsource(CreateSubsourceStatement {
285                name,
286                columns,
287                constraints,
288                of_source,
289                if_not_exists,
290                mut with_options,
291            }) => {
292                let raw_source_name = match of_source {
293                    // If `of_source` is None then this is a `progress` subsource which we
294                    // are not migrating as they are not currently relevant to the new table model.
295                    None => continue,
296                    Some(name) => name,
297                };
298                let source = match raw_source_name {
299                    // Some legacy subsources have named-only references to their `of_source`
300                    // so we ensure we always use an ID-based reference in the stored
301                    // `CREATE TABLE ... FROM SOURCE` statements.
302                    RawItemName::Name(name) => {
303                        // Convert the name reference to an ID reference.
304                        let (source_item, _) = items_with_statements_copied
305                            .iter()
306                            .find(|(_, statement)| match statement {
307                                Statement::CreateSource(stmt) => stmt.name == name,
308                                _ => false,
309                            })
310                            .expect("source must exist");
311                        RawItemName::Id(source_item.id.to_string(), name, None)
312                    }
313                    RawItemName::Id(..) => raw_source_name,
314                };
315
316                // The external reference is a `with_option` on subsource statements but is a
317                // separate field on table statements.
318                let external_reference = match with_options
319                    .iter()
320                    .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
321                {
322                    Some(i) => match with_options.remove(i).value {
323                        Some(WithOptionValue::UnresolvedItemName(name)) => name,
324                        _ => unreachable!("external reference must be an unresolved item name"),
325                    },
326                    None => panic!("subsource must have an external reference"),
327                };
328
329                let with_options = with_options
330                    .into_iter()
331                    .map(|option| {
332                        match option.name {
333                            CreateSubsourceOptionName::Details => TableFromSourceOption {
334                                name: TableFromSourceOptionName::Details,
335                                // The `details` option on both subsources and tables is identical, using the same
336                                // ProtoSourceExportStatementDetails serialized value.
337                                value: option.value,
338                            },
339                            CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
340                                name: TableFromSourceOptionName::TextColumns,
341                                value: option.value,
342                            },
343                            CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
344                                name: TableFromSourceOptionName::ExcludeColumns,
345                                value: option.value,
346                            },
347                            CreateSubsourceOptionName::Progress => {
348                                panic!("progress option should not exist on this subsource")
349                            }
350                            CreateSubsourceOptionName::ExternalReference => {
351                                unreachable!("This option is handled separately above.")
352                            }
353                        }
354                    })
355                    .collect::<Vec<_>>();
356
357                let table = CreateTableFromSourceStatement {
358                    name,
359                    constraints,
360                    columns: mz_sql::ast::TableFromSourceColumns::Defined(columns),
361                    if_not_exists,
362                    source,
363                    external_reference: Some(external_reference.clone()),
364                    with_options,
365                    // Subsources don't have `envelope`, `include_metadata`, or `format` options.
366                    envelope: None,
367                    include_metadata: vec![],
368                    format: None,
369                };
370
371                info!(
372                    "migrate: converted subsource {} to table {}",
373                    item.create_sql, table
374                );
375                item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
376                tx.update_item(item.id, item)?;
377            }
378
379            // Postgres sources are multi-output sources whose subsources are
380            // migrated above. All we need to do is remove the subsource-related
381            // options from this statement since they are no longer relevant.
382            Statement::CreateSource(CreateSourceStatement {
383                connection:
384                    mut conn @ (CreateSourceConnection::Postgres { .. }
385                    | CreateSourceConnection::Yugabyte { .. }),
386                name,
387                if_not_exists,
388                in_cluster,
389                include_metadata,
390                format,
391                envelope,
392                col_names,
393                with_options,
394                key_constraint,
395                external_references,
396                progress_subsource,
397            }) => {
398                let options = match &mut conn {
399                    CreateSourceConnection::Postgres { options, .. } => options,
400                    CreateSourceConnection::Yugabyte { options, .. } => options,
401                    _ => unreachable!("match determined above"),
402                };
403                // This option storing text columns on the primary source statement is redundant
404                // with the option on subsource statements so can just be removed.
405                // This was kept for round-tripping of `CREATE SOURCE` statements that automatically
406                // generated subsources, which is no longer necessary.
407                if options
408                    .iter()
409                    .any(|o| matches!(o.name, PgConfigOptionName::TextColumns))
410                {
411                    options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
412                    let stmt = Statement::CreateSource(CreateSourceStatement {
413                        connection: conn,
414                        name,
415                        if_not_exists,
416                        in_cluster,
417                        include_metadata,
418                        format,
419                        envelope,
420                        col_names,
421                        with_options,
422                        key_constraint,
423                        external_references,
424                        progress_subsource,
425                    });
426                    item.create_sql = stmt.to_ast_string_stable();
427                    tx.update_item(item.id, item)?;
428                    info!("migrate: converted postgres source {stmt} to remove subsource options");
429                }
430            }
431            // MySQL sources are multi-output sources whose subsources are
432            // migrated above. All we need to do is remove the subsource-related
433            // options from this statement since they are no longer relevant.
434            Statement::CreateSource(CreateSourceStatement {
435                connection: mut conn @ CreateSourceConnection::MySql { .. },
436                name,
437                if_not_exists,
438                in_cluster,
439                include_metadata,
440                format,
441                envelope,
442                col_names,
443                with_options,
444                key_constraint,
445                external_references,
446                progress_subsource,
447                ..
448            }) => {
449                let options = match &mut conn {
450                    CreateSourceConnection::MySql { options, .. } => options,
451                    _ => unreachable!("match determined above"),
452                };
453                // These options storing text and exclude columns on the primary source statement
454                // are redundant with the options on subsource statements so can just be removed.
455                // They was kept for round-tripping of `CREATE SOURCE` statements that automatically
456                // generated subsources, which is no longer necessary.
457                if options.iter().any(|o| {
458                    matches!(
459                        o.name,
460                        MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns
461                    )
462                }) {
463                    options.retain(|o| {
464                        !matches!(
465                            o.name,
466                            MySqlConfigOptionName::TextColumns
467                                | MySqlConfigOptionName::ExcludeColumns
468                        )
469                    });
470                    let stmt = Statement::CreateSource(CreateSourceStatement {
471                        connection: conn,
472                        name,
473                        if_not_exists,
474                        in_cluster,
475                        include_metadata,
476                        format,
477                        envelope,
478                        col_names,
479                        with_options,
480                        key_constraint,
481                        external_references,
482                        progress_subsource,
483                    });
484                    item.create_sql = stmt.to_ast_string_stable();
485                    tx.update_item(item.id, item)?;
486                    info!("migrate: converted mysql source {stmt} to remove subsource options");
487                }
488            }
489            // Multi-output load generator sources whose subsources are already
490            // migrated above. There is no need to remove any options from this
491            // statement since they are not export-specific.
492            Statement::CreateSource(CreateSourceStatement {
493                connection:
494                    CreateSourceConnection::LoadGenerator {
495                        generator:
496                            LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch,
497                        ..
498                    },
499                ..
500            }) => {}
501            // Single-output sources that need to be migrated to tables. These sources currently output
502            // data to the primary collection of the source statement. We will create a new table
503            // statement for them and move all export-specific options over from the source statement,
504            // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the
505            // new table statement.
506            Statement::CreateSource(CreateSourceStatement {
507                connection:
508                    conn @ (CreateSourceConnection::Kafka { .. }
509                    | CreateSourceConnection::LoadGenerator {
510                        generator:
511                            LoadGenerator::Clock
512                            | LoadGenerator::Datums
513                            | LoadGenerator::Counter
514                            | LoadGenerator::KeyValue,
515                        ..
516                    }),
517                name,
518                col_names,
519                include_metadata,
520                format,
521                envelope,
522                with_options,
523                if_not_exists,
524                in_cluster,
525                progress_subsource,
526                external_references,
527                key_constraint,
528            }) => {
529                // To check if this is a source that has already been migrated we use a basic
530                // heuristic: if there is at least one existing table for the source, and if
531                // the envelope/format/include_metadata options are empty, we assume it's
532                // already been migrated.
533                let tables_for_source =
534                    items_with_statements_copied
535                        .iter()
536                        .any(|(_, statement)| match statement {
537                            Statement::CreateTableFromSource(stmt) => {
538                                let source: CatalogItemId = match &stmt.source {
539                                    RawItemName::Name(_) => {
540                                        unreachable!("tables store source as ID")
541                                    }
542                                    RawItemName::Id(source_id, _, _) => {
543                                        source_id.parse().expect("valid id")
544                                    }
545                                };
546                                source == item.id
547                            }
548                            _ => false,
549                        });
550                if tables_for_source
551                    && envelope.is_none()
552                    && format.is_none()
553                    && include_metadata.is_empty()
554                {
555                    info!("migrate: skipping already migrated source: {}", name);
556                    continue;
557                }
558
559                // Use the current source name as the new table name, and rename the source to
560                // `<source_name>_source`. This is intended to allow users to continue using
561                // queries that reference the source name, since they will now need to query the
562                // table instead.
563
564                assert_eq!(
565                    item.name,
566                    name.0.last().expect("at least one ident").to_string()
567                );
568                // First find an unused name within the same schema to avoid conflicts.
569                let is_valid = |new_source_ident: &Ident| {
570                    if item_names_per_schema
571                        .get(&item.schema_id)
572                        .expect("schema must exist")
573                        .contains(&new_source_ident.to_string())
574                    {
575                        Ok::<_, IdentError>(false)
576                    } else {
577                        Ok(true)
578                    }
579                };
580                let new_source_ident =
581                    Ident::try_generate_name(item.name.clone(), "_source", is_valid)?;
582
583                // We will use the original item name for the new table item.
584                let table_item_name = item.name.clone();
585
586                // Update the source item/statement to use the new name.
587                let mut new_source_name = name.clone();
588                *new_source_name.0.last_mut().expect("at least one ident") =
589                    new_source_ident.clone();
590                item.name = new_source_ident.to_string();
591
592                // A reference to the source that will be included in the table statement
593                let source_ref =
594                    RawItemName::Id(item.id.to_string(), new_source_name.clone(), None);
595
596                let columns = if col_names.is_empty() {
597                    TableFromSourceColumns::NotSpecified
598                } else {
599                    TableFromSourceColumns::Named(col_names)
600                };
601
602                // All source tables must have a `details` option, which is a serialized proto
603                // describing any source-specific details for this table statement.
604                let details = match &conn {
605                    // For kafka sources this proto is currently empty.
606                    CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {},
607                    CreateSourceConnection::LoadGenerator { .. } => {
608                        // Since these load generators are single-output we use the default output.
609                        SourceExportStatementDetails::LoadGenerator {
610                            output: LoadGeneratorOutput::Default,
611                        }
612                    }
613                    _ => unreachable!("match determined above"),
614                };
615                let table_with_options = vec![TableFromSourceOption {
616                    name: TableFromSourceOptionName::Details,
617                    value: Some(WithOptionValue::Value(Value::String(hex::encode(
618                        details.into_proto().encode_to_vec(),
619                    )))),
620                }];
621
622                // Generate the same external-reference that would have been generated
623                // during purification for single-output sources.
624                let external_reference = match &conn {
625                    CreateSourceConnection::Kafka { options, .. } => {
626                        let topic_option = options
627                            .iter()
628                            .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
629                            .expect("kafka sources must have a topic");
630                        let topic = match &topic_option.value {
631                            Some(WithOptionValue::Value(Value::String(topic))) => topic,
632                            _ => unreachable!("topic must be a string"),
633                        };
634
635                        Some(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
636                    }
637                    CreateSourceConnection::LoadGenerator { generator, .. } => {
638                        // Since these load generators are single-output the external reference
639                        // uses the schema-name for both namespace and name.
640                        let name = FullItemName {
641                                database: mz_sql::names::RawDatabaseSpecifier::Name(
642                                    mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
643                                        .to_owned(),
644                                ),
645                                schema: generator.schema_name().to_string(),
646                                item: generator.schema_name().to_string(),
647                            };
648                        Some(UnresolvedItemName::from(name))
649                    }
650                    _ => unreachable!("match determined above"),
651                };
652
653                // The new table statement, stealing the name and the export-specific fields from
654                // the create source statement.
655                let table = CreateTableFromSourceStatement {
656                    name,
657                    constraints: vec![],
658                    columns,
659                    if_not_exists: false,
660                    source: source_ref,
661                    external_reference,
662                    with_options: table_with_options,
663                    envelope,
664                    include_metadata,
665                    format,
666                };
667
668                // The source statement with a new name and many of its fields emptied
669                let source = CreateSourceStatement {
670                    connection: conn,
671                    name: new_source_name,
672                    if_not_exists,
673                    in_cluster,
674                    include_metadata: vec![],
675                    format: None,
676                    envelope: None,
677                    col_names: vec![],
678                    with_options,
679                    key_constraint,
680                    external_references,
681                    progress_subsource,
682                };
683
684                let source_id = item.id;
685                let source_global_id = item.global_id;
686                let schema_id = item.schema_id.clone();
687                let schema = tx.get_schema(&item.schema_id).expect("schema must exist");
688
689                let owner_id = item.owner_id.clone();
690                let privileges = item.privileges.clone();
691                let extra_versions = item.extra_versions.clone();
692
693                // Update the source statement in the catalog first, since the name will
694                // otherwise conflict with the new table statement.
695                info!("migrate: updated source {} to {source}", item.create_sql);
696                item.create_sql = Statement::CreateSource(source).to_ast_string_stable();
697                tx.update_item(item.id, item)?;
698
699                // Insert the new table statement into the catalog with a new id.
700                let ids = tx.allocate_user_item_ids(1)?;
701                let (new_table_id, new_table_global_id) = ids[0];
702                info!("migrate: added table {new_table_id}: {table}");
703                tx.insert_user_item(
704                    new_table_id,
705                    new_table_global_id,
706                    schema_id,
707                    &table_item_name,
708                    table.to_ast_string_stable(),
709                    owner_id,
710                    privileges,
711                    &Default::default(),
712                    extra_versions,
713                )?;
714                // We need to move the shard currently attached to the source statement to the
715                // table statement such that the existing data in the shard is preserved and can
716                // be queried on the new table statement. However, we need to keep the GlobalId of
717                // the source the same, to preserve existing references to that statement in
718                // external tools such as DBT and Terraform. We will insert a new shard for the source
719                // statement which will be automatically created after the migration is complete.
720                let new_source_shard = ShardId::new();
721                let (source_global_id, existing_source_shard) = tx
722                    .delete_collection_metadata(btreeset! {source_global_id})
723                    .pop()
724                    .expect("shard should exist");
725                tx.insert_collection_metadata(btreemap! {
726                    new_table_global_id => existing_source_shard,
727                    source_global_id => new_source_shard
728                })?;
729
730                add_to_audit_log(
731                    tx,
732                    mz_audit_log::EventType::Create,
733                    mz_audit_log::ObjectType::Table,
734                    mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 {
735                        id: new_table_id.to_string(),
736                        name: mz_audit_log::FullNameV1 {
737                            database: schema
738                                .database_id
739                                .map(|d| d.to_string())
740                                .unwrap_or_default(),
741                            schema: schema.name,
742                            item: table_item_name,
743                        },
744                    }),
745                    now(),
746                )?;
747
748                // We also need to update any other statements that reference the source to use the new
749                // table id/name instead.
750                changed_ids.insert(source_id, new_table_id);
751            }
752
753            // TODO(sql_server1): Consider how to migrate SQL Server subsources
754            // to the source table world.
755            Statement::CreateSource(CreateSourceStatement {
756                connection: CreateSourceConnection::SqlServer { .. },
757                ..
758            }) => (),
759
760            #[expect(unreachable_patterns)]
761            Statement::CreateSource(_) => {}
762            _ => (),
763        }
764    }
765
766    let mut updated_items = BTreeMap::new();
767    for (mut item, mut statement) in items_with_statements_copied {
768        match &statement {
769            // Don’t rewrite any of the statements we just migrated.
770            Statement::CreateSource(_) => {}
771            Statement::CreateSubsource(_) => {}
772            Statement::CreateTableFromSource(_) => {}
773            // We need to rewrite any statements that reference a source id to use the new
774            // table id instead, since any contained data in the source will now be in the table.
775            // This assumes the table has stolen the source's name, which is the case
776            // for all sources that were migrated.
777            _ => {
778                if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) {
779                    info!("migrate: updated dependency reference in statement {statement}");
780                    item.create_sql = statement.to_ast_string_stable();
781                    updated_items.insert(item.id, item);
782                }
783            }
784        }
785    }
786    if !updated_items.is_empty() {
787        tx.update_items(updated_items)?;
788    }
789
790    Ok(())
791}
792
793// Durable migrations
794
795/// Migrations that run only on the durable catalog before any data is loaded into memory.
796pub(crate) fn durable_migrate(
797    tx: &mut Transaction,
798    _organization_id: Uuid,
799    _boot_ts: Timestamp,
800) -> Result<(), anyhow::Error> {
801    // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
802    // binary version instead of the deploy generation.
803    const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
804    const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
805    if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
806        if let Some(shard_id) = tx.get_expression_cache_shard() {
807            tx.mark_shards_as_finalized(btreeset! {shard_id});
808            tx.set_expression_cache_shard(ShardId::new())?;
809        }
810        tx.set_config(
811            EXPR_CACHE_MIGRATION_KEY.to_string(),
812            Some(EXPR_CACHE_MIGRATION_DONE),
813        )?;
814    }
815
816    // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
817    // binary version instead of the deploy generation.
818    const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
819    const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
820    if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
821        != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
822    {
823        if let Some(shard_id) = tx.get_builtin_migration_shard() {
824            tx.mark_shards_as_finalized(btreeset! {shard_id});
825            tx.set_builtin_migration_shard(ShardId::new())?;
826        }
827        tx.set_config(
828            BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
829            Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
830        )?;
831    }
832    Ok(())
833}
834
835// Add new migrations below their appropriate heading, and precede them with a
836// short summary of the migration's purpose and optional additional commentary
837// about safety or approach.
838//
839// The convention is to name the migration function using snake case:
840// > <category>_<description>_<version>
841//
842// Please include the adapter team on any code reviews that add or edit
843// migrations.
844
845fn add_to_audit_log(
846    tx: &mut Transaction,
847    event_type: mz_audit_log::EventType,
848    object_type: mz_audit_log::ObjectType,
849    details: mz_audit_log::EventDetails,
850    occurred_at: mz_ore::now::EpochMillis,
851) -> Result<(), anyhow::Error> {
852    let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
853    let event =
854        mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
855    tx.insert_audit_log_event(event);
856    Ok(())
857}
858
859// Remove PARTITION STRATEGY from CREATE SINK statements.
860fn ast_rewrite_create_sink_partition_strategy(
861    stmt: &mut Statement<Raw>,
862) -> Result<(), anyhow::Error> {
863    let Statement::CreateSink(stmt) = stmt else {
864        return Ok(());
865    };
866    stmt.with_options
867        .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
868    Ok(())
869}