mz_adapter/catalog/
migrate.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreeset;
13use mz_catalog::builtin::BuiltinTable;
14use mz_catalog::durable::Transaction;
15use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
16use mz_ore::collections::CollectionExt;
17use mz_ore::now::NowFn;
18use mz_persist_types::ShardId;
19use mz_repr::{CatalogItemId, Diff, Timestamp};
20use mz_sql::ast::CreateSinkOptionName;
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::names::FullItemName;
23use mz_sql_parser::ast::{IdentError, Raw, Statement};
24use mz_storage_client::controller::StorageTxn;
25use semver::Version;
26use tracing::info;
27use uuid::Uuid;
28
29// DO NOT add any more imports from `crate` outside of `crate::catalog`.
30use crate::catalog::open::into_consolidatable_updates_startup;
31use crate::catalog::state::LocalExpressionCache;
32use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
33
34fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
35where
36    F: for<'a> FnMut(
37        &'a mut Transaction<'_>,
38        CatalogItemId,
39        &'a mut Statement<Raw>,
40    ) -> Result<(), anyhow::Error>,
41{
42    let mut updated_items = BTreeMap::new();
43
44    for mut item in tx.get_items() {
45        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
46        f(tx, item.id, &mut stmt)?;
47
48        item.create_sql = stmt.to_ast_string_stable();
49
50        updated_items.insert(item.id, item);
51    }
52    tx.update_items(updated_items)?;
53    Ok(())
54}
55
56fn rewrite_items<F>(
57    tx: &mut Transaction<'_>,
58    cat: &ConnCatalog<'_>,
59    mut f: F,
60) -> Result<(), anyhow::Error>
61where
62    F: for<'a> FnMut(
63        &'a mut Transaction<'_>,
64        &'a &ConnCatalog<'_>,
65        CatalogItemId,
66        &'a mut Statement<Raw>,
67    ) -> Result<(), anyhow::Error>,
68{
69    let mut updated_items = BTreeMap::new();
70    let items = tx.get_items();
71    for mut item in items {
72        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
73
74        f(tx, &cat, item.id, &mut stmt)?;
75
76        item.create_sql = stmt.to_ast_string_stable();
77
78        updated_items.insert(item.id, item);
79    }
80    tx.update_items(updated_items)?;
81    Ok(())
82}
83
84pub(crate) struct MigrateResult {
85    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
86    pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
87}
88
89/// Migrates all user items and loads them into `state`.
90///
91/// Returns the builtin updates corresponding to all user items.
92pub(crate) async fn migrate(
93    state: &mut CatalogState,
94    tx: &mut Transaction<'_>,
95    local_expr_cache: &mut LocalExpressionCache,
96    item_updates: Vec<StateUpdate>,
97    now: NowFn,
98    _boot_ts: Timestamp,
99) -> Result<MigrateResult, anyhow::Error> {
100    let catalog_version = tx.get_catalog_content_version();
101    let catalog_version = match catalog_version {
102        Some(v) => Version::parse(v)?,
103        None => Version::new(0, 0, 0),
104    };
105
106    info!(
107        "migrating statements from catalog version {:?}",
108        catalog_version
109    );
110
111    // Special block for `ast_rewrite_sources_to_tables` migration
112    // since it requires a feature flag needs to update multiple AST items at once.
113    if state.system_config().force_source_table_syntax() {
114        ast_rewrite_sources_to_tables(tx, now)?;
115    }
116
117    rewrite_ast_items(tx, |_tx, _id, stmt| {
118        // Add per-item AST migrations below.
119        //
120        // Each migration should be a function that takes `stmt` (the AST
121        // representing the creation SQL for the item) as input. Any
122        // mutations to `stmt` will be staged for commit to the catalog.
123        //
124        // Migration functions may also take `tx` as input to stage
125        // arbitrary changes to the catalog.
126        ast_rewrite_create_sink_partition_strategy(stmt)?;
127        Ok(())
128    })?;
129
130    // Load items into catalog. We make sure to consolidate the old updates with the new updates to
131    // avoid trying to apply unmigrated items.
132    let commit_ts = tx.upper();
133    let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
134    let op_item_updates = tx.get_and_commit_op_updates();
135    let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
136    item_updates.extend(op_item_updates);
137    differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
138
139    // Since some migrations might introduce non-item 'post-item' updates, we sequester those
140    // so they can be applied with other post-item updates after migrations to avoid
141    // accumulating negative diffs.
142    let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
143        .into_iter()
144        // The only post-item update kind we currently generate is to
145        // update storage collection metadata.
146        .partition(|(kind, _, _)| {
147            matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
148        });
149
150    let item_updates = item_updates
151        .into_iter()
152        .map(|(kind, ts, diff)| StateUpdate {
153            kind: kind.into(),
154            ts,
155            diff: diff.try_into().expect("valid diff"),
156        })
157        .collect();
158    let mut ast_builtin_table_updates = state
159        .apply_updates_for_bootstrap(item_updates, local_expr_cache)
160        .await;
161
162    info!("migrating from catalog version {:?}", catalog_version);
163
164    let conn_cat = state.for_system_session();
165
166    rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
167        let _catalog_version = catalog_version.clone();
168        // Add per-item, post-planning AST migrations below. Most
169        // migrations should be in the above `rewrite_ast_items` block.
170        //
171        // Each migration should be a function that takes `item` (the AST
172        // representing the creation SQL for the item) as input. Any
173        // mutations to `item` will be staged for commit to the catalog.
174        //
175        // Be careful if you reference `conn_cat`. Doing so is *weird*,
176        // as you'll be rewriting the catalog while looking at it. If
177        // possible, make your migration independent of `conn_cat`, and only
178        // consider a single item at a time.
179        //
180        // Migration functions may also take `tx` as input to stage
181        // arbitrary changes to the catalog.
182        Ok(())
183    })?;
184
185    // Add whole-catalog migrations below.
186    //
187    // Each migration should be a function that takes `tx` and `conn_cat` as
188    // input and stages arbitrary transformations to the catalog on `tx`.
189
190    let op_item_updates = tx.get_and_commit_op_updates();
191    let item_builtin_table_updates = state
192        .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
193        .await;
194
195    ast_builtin_table_updates.extend(item_builtin_table_updates);
196
197    info!(
198        "migration from catalog version {:?} complete",
199        catalog_version
200    );
201    Ok(MigrateResult {
202        builtin_table_updates: ast_builtin_table_updates,
203        post_item_updates,
204    })
205}
206
207// Add new migrations below their appropriate heading, and precede them with a
208// short summary of the migration's purpose and optional additional commentary
209// about safety or approach.
210//
211// The convention is to name the migration function using snake case:
212// > <category>_<description>_<version>
213//
214// Please include the adapter team on any code reviews that add or edit
215// migrations.
216
217/// Migrates all sources to use the new sources as tables model
218///
219/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into
220/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres,
221/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources.
222///
223/// Second we migrate existing `CREATE SOURCE` statements for these multi-output
224/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS).
225///
226/// Third we migrate existing single-output `CREATE SOURCE` statements.
227/// This includes existing Kafka and single-output load-generator
228/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE`
229/// statement that copies over all the export-specific options. This table will use
230/// to the existing source statement's persist shard but use a new GlobalID.
231/// The original source statement will be updated to remove the export-specific options,
232/// renamed to `<original_name>_source`, and use a new empty shard while keeping its
233/// same GlobalId.
234///
235fn ast_rewrite_sources_to_tables(
236    tx: &mut Transaction<'_>,
237    now: NowFn,
238) -> Result<(), anyhow::Error> {
239    use maplit::btreemap;
240    use maplit::btreeset;
241    use mz_persist_types::ShardId;
242    use mz_proto::RustType;
243    use mz_sql::ast::{
244        CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
245        CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
246        KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
247        RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
248        UnresolvedItemName, Value, WithOptionValue,
249    };
250    use mz_storage_client::controller::StorageTxn;
251    use mz_storage_types::sources::SourceExportStatementDetails;
252    use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
253    use prost::Message;
254
255    let items_with_statements = tx
256        .get_items()
257        .map(|item| {
258            let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
259            Ok((item, stmt))
260        })
261        .collect::<Result<Vec<_>, anyhow::Error>>()?;
262    let items_with_statements_copied = items_with_statements.clone();
263
264    let item_names_per_schema = items_with_statements_copied
265        .iter()
266        .map(|(item, _)| (item.schema_id.clone(), &item.name))
267        .fold(BTreeMap::new(), |mut acc, (schema_id, name)| {
268            acc.entry(schema_id)
269                .or_insert_with(|| btreeset! {})
270                .insert(name);
271            acc
272        });
273
274    // Any CatalogItemId that should be changed to a new CatalogItemId in any statements that
275    // reference it. This is necessary for ensuring downstream statements (e.g.
276    // mat views, indexes) that reference a single-output source (e.g. kafka)
277    // will now reference the corresponding new table, with the same data, instead.
278    let mut changed_ids = BTreeMap::new();
279
280    for (mut item, stmt) in items_with_statements {
281        match stmt {
282            // Migrate each `CREATE SUBSOURCE` statement to an equivalent
283            // `CREATE TABLE ... FROM SOURCE` statement.
284            Statement::CreateSubsource(CreateSubsourceStatement {
285                name,
286                columns,
287                constraints,
288                of_source,
289                if_not_exists,
290                mut with_options,
291            }) => {
292                let raw_source_name = match of_source {
293                    // If `of_source` is None then this is a `progress` subsource which we
294                    // are not migrating as they are not currently relevant to the new table model.
295                    None => continue,
296                    Some(name) => name,
297                };
298                let source = match raw_source_name {
299                    // Some legacy subsources have named-only references to their `of_source`
300                    // so we ensure we always use an ID-based reference in the stored
301                    // `CREATE TABLE ... FROM SOURCE` statements.
302                    RawItemName::Name(name) => {
303                        // Convert the name reference to an ID reference.
304                        let (source_item, _) = items_with_statements_copied
305                            .iter()
306                            .find(|(_, statement)| match statement {
307                                Statement::CreateSource(stmt) => stmt.name == name,
308                                _ => false,
309                            })
310                            .expect("source must exist");
311                        RawItemName::Id(source_item.id.to_string(), name, None)
312                    }
313                    RawItemName::Id(..) => raw_source_name,
314                };
315
316                // The external reference is a `with_option` on subsource statements but is a
317                // separate field on table statements.
318                let external_reference = match with_options
319                    .iter()
320                    .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
321                {
322                    Some(i) => match with_options.remove(i).value {
323                        Some(WithOptionValue::UnresolvedItemName(name)) => name,
324                        _ => unreachable!("external reference must be an unresolved item name"),
325                    },
326                    None => panic!("subsource must have an external reference"),
327                };
328
329                let with_options = with_options
330                    .into_iter()
331                    .map(|option| {
332                        match option.name {
333                            CreateSubsourceOptionName::Details => TableFromSourceOption {
334                                name: TableFromSourceOptionName::Details,
335                                // The `details` option on both subsources and tables is identical, using the same
336                                // ProtoSourceExportStatementDetails serialized value.
337                                value: option.value,
338                            },
339                            CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
340                                name: TableFromSourceOptionName::TextColumns,
341                                value: option.value,
342                            },
343                            CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
344                                name: TableFromSourceOptionName::ExcludeColumns,
345                                value: option.value,
346                            },
347                            CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
348                                name: TableFromSourceOptionName::RetainHistory,
349                                value: option.value,
350                            },
351                            CreateSubsourceOptionName::Progress => {
352                                panic!("progress option should not exist on this subsource")
353                            }
354                            CreateSubsourceOptionName::ExternalReference => {
355                                unreachable!("This option is handled separately above.")
356                            }
357                        }
358                    })
359                    .collect::<Vec<_>>();
360
361                let table = CreateTableFromSourceStatement {
362                    name,
363                    constraints,
364                    columns: mz_sql::ast::TableFromSourceColumns::Defined(columns),
365                    if_not_exists,
366                    source,
367                    external_reference: Some(external_reference.clone()),
368                    with_options,
369                    // Subsources don't have `envelope`, `include_metadata`, or `format` options.
370                    envelope: None,
371                    include_metadata: vec![],
372                    format: None,
373                };
374
375                info!(
376                    "migrate: converted subsource {} to table {}",
377                    item.create_sql, table
378                );
379                item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
380                tx.update_item(item.id, item)?;
381            }
382
383            // Postgres sources are multi-output sources whose subsources are
384            // migrated above. All we need to do is remove the subsource-related
385            // options from this statement since they are no longer relevant.
386            Statement::CreateSource(CreateSourceStatement {
387                connection: mut conn @ CreateSourceConnection::Postgres { .. },
388                name,
389                if_not_exists,
390                in_cluster,
391                include_metadata,
392                format,
393                envelope,
394                col_names,
395                with_options,
396                key_constraint,
397                external_references,
398                progress_subsource,
399            }) => {
400                let options = match &mut conn {
401                    CreateSourceConnection::Postgres { options, .. } => options,
402                    _ => unreachable!("match determined above"),
403                };
404                // This option storing text columns on the primary source statement is redundant
405                // with the option on subsource statements so can just be removed.
406                // This was kept for round-tripping of `CREATE SOURCE` statements that automatically
407                // generated subsources, which is no longer necessary.
408                if options
409                    .iter()
410                    .any(|o| matches!(o.name, PgConfigOptionName::TextColumns))
411                {
412                    options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
413                    let stmt = Statement::CreateSource(CreateSourceStatement {
414                        connection: conn,
415                        name,
416                        if_not_exists,
417                        in_cluster,
418                        include_metadata,
419                        format,
420                        envelope,
421                        col_names,
422                        with_options,
423                        key_constraint,
424                        external_references,
425                        progress_subsource,
426                    });
427                    item.create_sql = stmt.to_ast_string_stable();
428                    tx.update_item(item.id, item)?;
429                    info!("migrate: converted postgres source {stmt} to remove subsource options");
430                }
431            }
432            // MySQL sources are multi-output sources whose subsources are
433            // migrated above. All we need to do is remove the subsource-related
434            // options from this statement since they are no longer relevant.
435            Statement::CreateSource(CreateSourceStatement {
436                connection: mut conn @ CreateSourceConnection::MySql { .. },
437                name,
438                if_not_exists,
439                in_cluster,
440                include_metadata,
441                format,
442                envelope,
443                col_names,
444                with_options,
445                key_constraint,
446                external_references,
447                progress_subsource,
448                ..
449            }) => {
450                let options = match &mut conn {
451                    CreateSourceConnection::MySql { options, .. } => options,
452                    _ => unreachable!("match determined above"),
453                };
454                // These options storing text and exclude columns on the primary source statement
455                // are redundant with the options on subsource statements so can just be removed.
456                // They was kept for round-tripping of `CREATE SOURCE` statements that automatically
457                // generated subsources, which is no longer necessary.
458                if options.iter().any(|o| {
459                    matches!(
460                        o.name,
461                        MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns
462                    )
463                }) {
464                    options.retain(|o| {
465                        !matches!(
466                            o.name,
467                            MySqlConfigOptionName::TextColumns
468                                | MySqlConfigOptionName::ExcludeColumns
469                        )
470                    });
471                    let stmt = Statement::CreateSource(CreateSourceStatement {
472                        connection: conn,
473                        name,
474                        if_not_exists,
475                        in_cluster,
476                        include_metadata,
477                        format,
478                        envelope,
479                        col_names,
480                        with_options,
481                        key_constraint,
482                        external_references,
483                        progress_subsource,
484                    });
485                    item.create_sql = stmt.to_ast_string_stable();
486                    tx.update_item(item.id, item)?;
487                    info!("migrate: converted mysql source {stmt} to remove subsource options");
488                }
489            }
490            // Multi-output load generator sources whose subsources are already
491            // migrated above. There is no need to remove any options from this
492            // statement since they are not export-specific.
493            Statement::CreateSource(CreateSourceStatement {
494                connection:
495                    CreateSourceConnection::LoadGenerator {
496                        generator:
497                            LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch,
498                        ..
499                    },
500                ..
501            }) => {}
502            // Single-output sources that need to be migrated to tables. These sources currently output
503            // data to the primary collection of the source statement. We will create a new table
504            // statement for them and move all export-specific options over from the source statement,
505            // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the
506            // new table statement.
507            Statement::CreateSource(CreateSourceStatement {
508                connection:
509                    conn @ (CreateSourceConnection::Kafka { .. }
510                    | CreateSourceConnection::LoadGenerator {
511                        generator:
512                            LoadGenerator::Clock
513                            | LoadGenerator::Datums
514                            | LoadGenerator::Counter
515                            | LoadGenerator::KeyValue,
516                        ..
517                    }),
518                name,
519                col_names,
520                include_metadata,
521                format,
522                envelope,
523                with_options,
524                if_not_exists,
525                in_cluster,
526                progress_subsource,
527                external_references,
528                key_constraint,
529            }) => {
530                // To check if this is a source that has already been migrated we use a basic
531                // heuristic: if there is at least one existing table for the source, and if
532                // the envelope/format/include_metadata options are empty, we assume it's
533                // already been migrated.
534                let tables_for_source =
535                    items_with_statements_copied
536                        .iter()
537                        .any(|(_, statement)| match statement {
538                            Statement::CreateTableFromSource(stmt) => {
539                                let source: CatalogItemId = match &stmt.source {
540                                    RawItemName::Name(_) => {
541                                        unreachable!("tables store source as ID")
542                                    }
543                                    RawItemName::Id(source_id, _, _) => {
544                                        source_id.parse().expect("valid id")
545                                    }
546                                };
547                                source == item.id
548                            }
549                            _ => false,
550                        });
551                if tables_for_source
552                    && envelope.is_none()
553                    && format.is_none()
554                    && include_metadata.is_empty()
555                {
556                    info!("migrate: skipping already migrated source: {}", name);
557                    continue;
558                }
559
560                // Use the current source name as the new table name, and rename the source to
561                // `<source_name>_source`. This is intended to allow users to continue using
562                // queries that reference the source name, since they will now need to query the
563                // table instead.
564
565                assert_eq!(
566                    item.name,
567                    name.0.last().expect("at least one ident").to_string()
568                );
569                // First find an unused name within the same schema to avoid conflicts.
570                let is_valid = |new_source_ident: &Ident| {
571                    if item_names_per_schema
572                        .get(&item.schema_id)
573                        .expect("schema must exist")
574                        .contains(&new_source_ident.to_string())
575                    {
576                        Ok::<_, IdentError>(false)
577                    } else {
578                        Ok(true)
579                    }
580                };
581                let new_source_ident =
582                    Ident::try_generate_name(item.name.clone(), "_source", is_valid)?;
583
584                // We will use the original item name for the new table item.
585                let table_item_name = item.name.clone();
586
587                // Update the source item/statement to use the new name.
588                let mut new_source_name = name.clone();
589                *new_source_name.0.last_mut().expect("at least one ident") =
590                    new_source_ident.clone();
591                item.name = new_source_ident.to_string();
592
593                // A reference to the source that will be included in the table statement
594                let source_ref =
595                    RawItemName::Id(item.id.to_string(), new_source_name.clone(), None);
596
597                let columns = if col_names.is_empty() {
598                    TableFromSourceColumns::NotSpecified
599                } else {
600                    TableFromSourceColumns::Named(col_names)
601                };
602
603                // All source tables must have a `details` option, which is a serialized proto
604                // describing any source-specific details for this table statement.
605                let details = match &conn {
606                    // For kafka sources this proto is currently empty.
607                    CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {},
608                    CreateSourceConnection::LoadGenerator { .. } => {
609                        // Since these load generators are single-output we use the default output.
610                        SourceExportStatementDetails::LoadGenerator {
611                            output: LoadGeneratorOutput::Default,
612                        }
613                    }
614                    _ => unreachable!("match determined above"),
615                };
616                let table_with_options = vec![TableFromSourceOption {
617                    name: TableFromSourceOptionName::Details,
618                    value: Some(WithOptionValue::Value(Value::String(hex::encode(
619                        details.into_proto().encode_to_vec(),
620                    )))),
621                }];
622
623                // Generate the same external-reference that would have been generated
624                // during purification for single-output sources.
625                let external_reference = match &conn {
626                    CreateSourceConnection::Kafka { options, .. } => {
627                        let topic_option = options
628                            .iter()
629                            .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
630                            .expect("kafka sources must have a topic");
631                        let topic = match &topic_option.value {
632                            Some(WithOptionValue::Value(Value::String(topic))) => topic,
633                            _ => unreachable!("topic must be a string"),
634                        };
635
636                        Some(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
637                    }
638                    CreateSourceConnection::LoadGenerator { generator, .. } => {
639                        // Since these load generators are single-output the external reference
640                        // uses the schema-name for both namespace and name.
641                        let name = FullItemName {
642                                database: mz_sql::names::RawDatabaseSpecifier::Name(
643                                    mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
644                                        .to_owned(),
645                                ),
646                                schema: generator.schema_name().to_string(),
647                                item: generator.schema_name().to_string(),
648                            };
649                        Some(UnresolvedItemName::from(name))
650                    }
651                    _ => unreachable!("match determined above"),
652                };
653
654                // The new table statement, stealing the name and the export-specific fields from
655                // the create source statement.
656                let table = CreateTableFromSourceStatement {
657                    name,
658                    constraints: vec![],
659                    columns,
660                    if_not_exists: false,
661                    source: source_ref,
662                    external_reference,
663                    with_options: table_with_options,
664                    envelope,
665                    include_metadata,
666                    format,
667                };
668
669                // The source statement with a new name and many of its fields emptied
670                let source = CreateSourceStatement {
671                    connection: conn,
672                    name: new_source_name,
673                    if_not_exists,
674                    in_cluster,
675                    include_metadata: vec![],
676                    format: None,
677                    envelope: None,
678                    col_names: vec![],
679                    with_options,
680                    key_constraint,
681                    external_references,
682                    progress_subsource,
683                };
684
685                let source_id = item.id;
686                let source_global_id = item.global_id;
687                let schema_id = item.schema_id.clone();
688                let schema = tx.get_schema(&item.schema_id).expect("schema must exist");
689
690                let owner_id = item.owner_id.clone();
691                let privileges = item.privileges.clone();
692                let extra_versions = item.extra_versions.clone();
693
694                // Update the source statement in the catalog first, since the name will
695                // otherwise conflict with the new table statement.
696                info!("migrate: updated source {} to {source}", item.create_sql);
697                item.create_sql = Statement::CreateSource(source).to_ast_string_stable();
698                tx.update_item(item.id, item)?;
699
700                // Insert the new table statement into the catalog with a new id.
701                let ids = tx.allocate_user_item_ids(1)?;
702                let (new_table_id, new_table_global_id) = ids[0];
703                info!("migrate: added table {new_table_id}: {table}");
704                tx.insert_user_item(
705                    new_table_id,
706                    new_table_global_id,
707                    schema_id,
708                    &table_item_name,
709                    table.to_ast_string_stable(),
710                    owner_id,
711                    privileges,
712                    &Default::default(),
713                    extra_versions,
714                )?;
715                // We need to move the shard currently attached to the source statement to the
716                // table statement such that the existing data in the shard is preserved and can
717                // be queried on the new table statement. However, we need to keep the GlobalId of
718                // the source the same, to preserve existing references to that statement in
719                // external tools such as DBT and Terraform. We will insert a new shard for the source
720                // statement which will be automatically created after the migration is complete.
721                let new_source_shard = ShardId::new();
722                let (source_global_id, existing_source_shard) = tx
723                    .delete_collection_metadata(btreeset! {source_global_id})
724                    .pop()
725                    .expect("shard should exist");
726                tx.insert_collection_metadata(btreemap! {
727                    new_table_global_id => existing_source_shard,
728                    source_global_id => new_source_shard
729                })?;
730
731                add_to_audit_log(
732                    tx,
733                    mz_audit_log::EventType::Create,
734                    mz_audit_log::ObjectType::Table,
735                    mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 {
736                        id: new_table_id.to_string(),
737                        name: mz_audit_log::FullNameV1 {
738                            database: schema
739                                .database_id
740                                .map(|d| d.to_string())
741                                .unwrap_or_default(),
742                            schema: schema.name,
743                            item: table_item_name,
744                        },
745                    }),
746                    now(),
747                )?;
748
749                // We also need to update any other statements that reference the source to use the new
750                // table id/name instead.
751                changed_ids.insert(source_id, new_table_id);
752            }
753
754            // TODO(sql_server2): Consider how to migrate SQL Server subsources
755            // to the source table world.
756            Statement::CreateSource(CreateSourceStatement {
757                connection: CreateSourceConnection::SqlServer { .. },
758                ..
759            }) => (),
760
761            #[expect(unreachable_patterns)]
762            Statement::CreateSource(_) => {}
763            _ => (),
764        }
765    }
766
767    let mut updated_items = BTreeMap::new();
768    for (mut item, mut statement) in items_with_statements_copied {
769        match &statement {
770            // Don’t rewrite any of the statements we just migrated.
771            Statement::CreateSource(_) => {}
772            Statement::CreateSubsource(_) => {}
773            Statement::CreateTableFromSource(_) => {}
774            // We need to rewrite any statements that reference a source id to use the new
775            // table id instead, since any contained data in the source will now be in the table.
776            // This assumes the table has stolen the source's name, which is the case
777            // for all sources that were migrated.
778            _ => {
779                if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) {
780                    info!("migrate: updated dependency reference in statement {statement}");
781                    item.create_sql = statement.to_ast_string_stable();
782                    updated_items.insert(item.id, item);
783                }
784            }
785        }
786    }
787    if !updated_items.is_empty() {
788        tx.update_items(updated_items)?;
789    }
790
791    Ok(())
792}
793
794// Durable migrations
795
796/// Migrations that run only on the durable catalog before any data is loaded into memory.
797pub(crate) fn durable_migrate(
798    tx: &mut Transaction,
799    _organization_id: Uuid,
800    _boot_ts: Timestamp,
801) -> Result<(), anyhow::Error> {
802    // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
803    // binary version instead of the deploy generation.
804    const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
805    const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
806    if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
807        if let Some(shard_id) = tx.get_expression_cache_shard() {
808            tx.mark_shards_as_finalized(btreeset! {shard_id});
809            tx.set_expression_cache_shard(ShardId::new())?;
810        }
811        tx.set_config(
812            EXPR_CACHE_MIGRATION_KEY.to_string(),
813            Some(EXPR_CACHE_MIGRATION_DONE),
814        )?;
815    }
816
817    // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
818    // binary version instead of the deploy generation.
819    const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
820    const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
821    if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
822        != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
823    {
824        if let Some(shard_id) = tx.get_builtin_migration_shard() {
825            tx.mark_shards_as_finalized(btreeset! {shard_id});
826            tx.set_builtin_migration_shard(ShardId::new())?;
827        }
828        tx.set_config(
829            BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
830            Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
831        )?;
832    }
833    Ok(())
834}
835
836// Add new migrations below their appropriate heading, and precede them with a
837// short summary of the migration's purpose and optional additional commentary
838// about safety or approach.
839//
840// The convention is to name the migration function using snake case:
841// > <category>_<description>_<version>
842//
843// Please include the adapter team on any code reviews that add or edit
844// migrations.
845
846fn add_to_audit_log(
847    tx: &mut Transaction,
848    event_type: mz_audit_log::EventType,
849    object_type: mz_audit_log::ObjectType,
850    details: mz_audit_log::EventDetails,
851    occurred_at: mz_ore::now::EpochMillis,
852) -> Result<(), anyhow::Error> {
853    let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
854    let event =
855        mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
856    tx.insert_audit_log_event(event);
857    Ok(())
858}
859
860// Remove PARTITION STRATEGY from CREATE SINK statements.
861fn ast_rewrite_create_sink_partition_strategy(
862    stmt: &mut Statement<Raw>,
863) -> Result<(), anyhow::Error> {
864    let Statement::CreateSink(stmt) = stmt else {
865        return Ok(());
866    };
867    stmt.with_options
868        .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
869    Ok(())
870}