mz_adapter/catalog/
migrate.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_repr::{CatalogItemId, Diff, Timestamp};
21use mz_sql::ast::CreateSinkOptionName;
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::names::FullItemName;
24use mz_sql_parser::ast::{IdentError, Raw, Statement};
25use mz_storage_client::controller::StorageTxn;
26use semver::Version;
27use tracing::info;
28use uuid::Uuid;
29
30// DO NOT add any more imports from `crate` outside of `crate::catalog`.
31use crate::catalog::open::into_consolidatable_updates_startup;
32use crate::catalog::state::LocalExpressionCache;
33use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
34
35fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
36where
37    F: for<'a> FnMut(
38        &'a mut Transaction<'_>,
39        CatalogItemId,
40        &'a mut Statement<Raw>,
41    ) -> Result<(), anyhow::Error>,
42{
43    let mut updated_items = BTreeMap::new();
44
45    for mut item in tx.get_items() {
46        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
47        f(tx, item.id, &mut stmt)?;
48
49        item.create_sql = stmt.to_ast_string_stable();
50
51        updated_items.insert(item.id, item);
52    }
53    tx.update_items(updated_items)?;
54    Ok(())
55}
56
57fn rewrite_items<F>(
58    tx: &mut Transaction<'_>,
59    cat: &ConnCatalog<'_>,
60    mut f: F,
61) -> Result<(), anyhow::Error>
62where
63    F: for<'a> FnMut(
64        &'a mut Transaction<'_>,
65        &'a &ConnCatalog<'_>,
66        CatalogItemId,
67        &'a mut Statement<Raw>,
68    ) -> Result<(), anyhow::Error>,
69{
70    let mut updated_items = BTreeMap::new();
71    let items = tx.get_items();
72    for mut item in items {
73        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
74
75        f(tx, &cat, item.id, &mut stmt)?;
76
77        item.create_sql = stmt.to_ast_string_stable();
78
79        updated_items.insert(item.id, item);
80    }
81    tx.update_items(updated_items)?;
82    Ok(())
83}
84
85pub(crate) struct MigrateResult {
86    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
87    pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
88}
89
90/// Migrates all user items and loads them into `state`.
91///
92/// Returns the builtin updates corresponding to all user items.
93pub(crate) async fn migrate(
94    state: &mut CatalogState,
95    tx: &mut Transaction<'_>,
96    local_expr_cache: &mut LocalExpressionCache,
97    item_updates: Vec<StateUpdate>,
98    now: NowFn,
99    _boot_ts: Timestamp,
100) -> Result<MigrateResult, anyhow::Error> {
101    let catalog_version = tx.get_catalog_content_version();
102    let catalog_version = match catalog_version {
103        Some(v) => Version::parse(v)?,
104        None => Version::new(0, 0, 0),
105    };
106
107    info!(
108        "migrating statements from catalog version {:?}",
109        catalog_version
110    );
111
112    // Special block for `ast_rewrite_sources_to_tables` migration
113    // since it requires a feature flag needs to update multiple AST items at once.
114    if state.system_config().force_source_table_syntax() {
115        ast_rewrite_sources_to_tables(tx, now)?;
116    }
117
118    rewrite_ast_items(tx, |_tx, _id, stmt| {
119        // Add per-item AST migrations below.
120        //
121        // Each migration should be a function that takes `stmt` (the AST
122        // representing the creation SQL for the item) as input. Any
123        // mutations to `stmt` will be staged for commit to the catalog.
124        //
125        // Migration functions may also take `tx` as input to stage
126        // arbitrary changes to the catalog.
127        ast_rewrite_create_sink_partition_strategy(stmt)?;
128        Ok(())
129    })?;
130
131    // Load items into catalog. We make sure to consolidate the old updates with the new updates to
132    // avoid trying to apply unmigrated items.
133    let commit_ts = tx.upper();
134    let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
135    let op_item_updates = tx.get_and_commit_op_updates();
136    let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
137    item_updates.extend(op_item_updates);
138    differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
139
140    // Since some migrations might introduce non-item 'post-item' updates, we sequester those
141    // so they can be applied with other post-item updates after migrations to avoid
142    // accumulating negative diffs.
143    let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
144        .into_iter()
145        // The only post-item update kind we currently generate is to
146        // update storage collection metadata.
147        .partition(|(kind, _, _)| {
148            matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
149        });
150
151    let item_updates = item_updates
152        .into_iter()
153        .map(|(kind, ts, diff)| StateUpdate {
154            kind: kind.into(),
155            ts,
156            diff: diff.try_into().expect("valid diff"),
157        })
158        .collect();
159    let mut ast_builtin_table_updates = state
160        .apply_updates_for_bootstrap(item_updates, local_expr_cache)
161        .await;
162
163    info!("migrating from catalog version {:?}", catalog_version);
164
165    let conn_cat = state.for_system_session();
166
167    rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
168        let _catalog_version = catalog_version.clone();
169        // Add per-item, post-planning AST migrations below. Most
170        // migrations should be in the above `rewrite_ast_items` block.
171        //
172        // Each migration should be a function that takes `item` (the AST
173        // representing the creation SQL for the item) as input. Any
174        // mutations to `item` will be staged for commit to the catalog.
175        //
176        // Be careful if you reference `conn_cat`. Doing so is *weird*,
177        // as you'll be rewriting the catalog while looking at it. If
178        // possible, make your migration independent of `conn_cat`, and only
179        // consider a single item at a time.
180        //
181        // Migration functions may also take `tx` as input to stage
182        // arbitrary changes to the catalog.
183        Ok(())
184    })?;
185
186    // Add whole-catalog migrations below.
187    //
188    // Each migration should be a function that takes `tx` and `conn_cat` as
189    // input and stages arbitrary transformations to the catalog on `tx`.
190
191    let op_item_updates = tx.get_and_commit_op_updates();
192    let item_builtin_table_updates = state
193        .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
194        .await;
195
196    ast_builtin_table_updates.extend(item_builtin_table_updates);
197
198    info!(
199        "migration from catalog version {:?} complete",
200        catalog_version
201    );
202    Ok(MigrateResult {
203        builtin_table_updates: ast_builtin_table_updates,
204        post_item_updates,
205    })
206}
207
208// Add new migrations below their appropriate heading, and precede them with a
209// short summary of the migration's purpose and optional additional commentary
210// about safety or approach.
211//
212// The convention is to name the migration function using snake case:
213// > <category>_<description>_<version>
214//
215// Please include the adapter team on any code reviews that add or edit
216// migrations.
217
218/// Migrates all sources to use the new sources as tables model
219///
220/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into
221/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres,
222/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources.
223///
224/// Second we migrate existing `CREATE SOURCE` statements for these multi-output
225/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS).
226///
227/// Third we migrate existing single-output `CREATE SOURCE` statements.
228/// This includes existing Kafka and single-output load-generator
229/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE`
230/// statement that copies over all the export-specific options. This table will use
231/// to the existing source statement's persist shard but use a new GlobalID.
232/// The original source statement will be updated to remove the export-specific options,
233/// renamed to `<original_name>_source`, and use a new empty shard while keeping its
234/// same GlobalId.
235///
236fn ast_rewrite_sources_to_tables(
237    tx: &mut Transaction<'_>,
238    now: NowFn,
239) -> Result<(), anyhow::Error> {
240    use maplit::btreemap;
241    use maplit::btreeset;
242    use mz_persist_types::ShardId;
243    use mz_proto::RustType;
244    use mz_sql::ast::{
245        CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
246        CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
247        KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
248        RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
249        UnresolvedItemName, Value, WithOptionValue,
250    };
251    use mz_storage_client::controller::StorageTxn;
252    use mz_storage_types::sources::SourceExportStatementDetails;
253    use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
254    use prost::Message;
255
256    let items_with_statements = tx
257        .get_items()
258        .map(|item| {
259            let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
260            Ok((item, stmt))
261        })
262        .collect::<Result<Vec<_>, anyhow::Error>>()?;
263    let items_with_statements_copied = items_with_statements.clone();
264
265    let item_names_per_schema = items_with_statements_copied
266        .iter()
267        .map(|(item, _)| (item.schema_id.clone(), &item.name))
268        .fold(BTreeMap::new(), |mut acc, (schema_id, name)| {
269            acc.entry(schema_id)
270                .or_insert_with(|| btreeset! {})
271                .insert(name);
272            acc
273        });
274
275    // Any CatalogItemId that should be changed to a new CatalogItemId in any statements that
276    // reference it. This is necessary for ensuring downstream statements (e.g.
277    // mat views, indexes) that reference a single-output source (e.g. kafka)
278    // will now reference the corresponding new table, with the same data, instead.
279    let mut changed_ids = BTreeMap::new();
280
281    for (mut item, stmt) in items_with_statements {
282        match stmt {
283            // Migrate each `CREATE SUBSOURCE` statement to an equivalent
284            // `CREATE TABLE ... FROM SOURCE` statement.
285            Statement::CreateSubsource(CreateSubsourceStatement {
286                name,
287                columns,
288                constraints,
289                of_source,
290                if_not_exists,
291                mut with_options,
292            }) => {
293                let raw_source_name = match of_source {
294                    // If `of_source` is None then this is a `progress` subsource which we
295                    // are not migrating as they are not currently relevant to the new table model.
296                    None => continue,
297                    Some(name) => name,
298                };
299                let source = match raw_source_name {
300                    // Some legacy subsources have named-only references to their `of_source`
301                    // so we ensure we always use an ID-based reference in the stored
302                    // `CREATE TABLE ... FROM SOURCE` statements.
303                    RawItemName::Name(name) => {
304                        // Convert the name reference to an ID reference.
305                        let (source_item, _) = items_with_statements_copied
306                            .iter()
307                            .find(|(_, statement)| match statement {
308                                Statement::CreateSource(stmt) => stmt.name == name,
309                                _ => false,
310                            })
311                            .expect("source must exist");
312                        RawItemName::Id(source_item.id.to_string(), name, None)
313                    }
314                    RawItemName::Id(..) => raw_source_name,
315                };
316
317                // The external reference is a `with_option` on subsource statements but is a
318                // separate field on table statements.
319                let external_reference = match with_options
320                    .iter()
321                    .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
322                {
323                    Some(i) => match with_options.remove(i).value {
324                        Some(WithOptionValue::UnresolvedItemName(name)) => name,
325                        _ => unreachable!("external reference must be an unresolved item name"),
326                    },
327                    None => panic!("subsource must have an external reference"),
328                };
329
330                let with_options = with_options
331                    .into_iter()
332                    .map(|option| {
333                        match option.name {
334                            CreateSubsourceOptionName::Details => TableFromSourceOption {
335                                name: TableFromSourceOptionName::Details,
336                                // The `details` option on both subsources and tables is identical, using the same
337                                // ProtoSourceExportStatementDetails serialized value.
338                                value: option.value,
339                            },
340                            CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
341                                name: TableFromSourceOptionName::TextColumns,
342                                value: option.value,
343                            },
344                            CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
345                                name: TableFromSourceOptionName::ExcludeColumns,
346                                value: option.value,
347                            },
348                            CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
349                                name: TableFromSourceOptionName::RetainHistory,
350                                value: option.value,
351                            },
352                            CreateSubsourceOptionName::Progress => {
353                                panic!("progress option should not exist on this subsource")
354                            }
355                            CreateSubsourceOptionName::ExternalReference => {
356                                unreachable!("This option is handled separately above.")
357                            }
358                        }
359                    })
360                    .collect::<Vec<_>>();
361
362                let table = CreateTableFromSourceStatement {
363                    name,
364                    constraints,
365                    columns: mz_sql::ast::TableFromSourceColumns::Defined(columns),
366                    if_not_exists,
367                    source,
368                    external_reference: Some(external_reference.clone()),
369                    with_options,
370                    // Subsources don't have `envelope`, `include_metadata`, or `format` options.
371                    envelope: None,
372                    include_metadata: vec![],
373                    format: None,
374                };
375
376                info!(
377                    "migrate: converted subsource {} to table {}",
378                    item.create_sql, table
379                );
380                item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
381                tx.update_item(item.id, item)?;
382            }
383
384            // Postgres sources are multi-output sources whose subsources are
385            // migrated above. All we need to do is remove the subsource-related
386            // options from this statement since they are no longer relevant.
387            Statement::CreateSource(CreateSourceStatement {
388                connection: mut conn @ CreateSourceConnection::Postgres { .. },
389                name,
390                if_not_exists,
391                in_cluster,
392                include_metadata,
393                format,
394                envelope,
395                col_names,
396                with_options,
397                key_constraint,
398                external_references,
399                progress_subsource,
400            }) => {
401                let options = match &mut conn {
402                    CreateSourceConnection::Postgres { options, .. } => options,
403                    _ => unreachable!("match determined above"),
404                };
405                // This option storing text columns on the primary source statement is redundant
406                // with the option on subsource statements so can just be removed.
407                // This was kept for round-tripping of `CREATE SOURCE` statements that automatically
408                // generated subsources, which is no longer necessary.
409                if options
410                    .iter()
411                    .any(|o| matches!(o.name, PgConfigOptionName::TextColumns))
412                {
413                    options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
414                    let stmt = Statement::CreateSource(CreateSourceStatement {
415                        connection: conn,
416                        name,
417                        if_not_exists,
418                        in_cluster,
419                        include_metadata,
420                        format,
421                        envelope,
422                        col_names,
423                        with_options,
424                        key_constraint,
425                        external_references,
426                        progress_subsource,
427                    });
428                    item.create_sql = stmt.to_ast_string_stable();
429                    tx.update_item(item.id, item)?;
430                    info!("migrate: converted postgres source {stmt} to remove subsource options");
431                }
432            }
433            // MySQL sources are multi-output sources whose subsources are
434            // migrated above. All we need to do is remove the subsource-related
435            // options from this statement since they are no longer relevant.
436            Statement::CreateSource(CreateSourceStatement {
437                connection: mut conn @ CreateSourceConnection::MySql { .. },
438                name,
439                if_not_exists,
440                in_cluster,
441                include_metadata,
442                format,
443                envelope,
444                col_names,
445                with_options,
446                key_constraint,
447                external_references,
448                progress_subsource,
449                ..
450            }) => {
451                let options = match &mut conn {
452                    CreateSourceConnection::MySql { options, .. } => options,
453                    _ => unreachable!("match determined above"),
454                };
455                // These options storing text and exclude columns on the primary source statement
456                // are redundant with the options on subsource statements so can just be removed.
457                // They was kept for round-tripping of `CREATE SOURCE` statements that automatically
458                // generated subsources, which is no longer necessary.
459                if options.iter().any(|o| {
460                    matches!(
461                        o.name,
462                        MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns
463                    )
464                }) {
465                    options.retain(|o| {
466                        !matches!(
467                            o.name,
468                            MySqlConfigOptionName::TextColumns
469                                | MySqlConfigOptionName::ExcludeColumns
470                        )
471                    });
472                    let stmt = Statement::CreateSource(CreateSourceStatement {
473                        connection: conn,
474                        name,
475                        if_not_exists,
476                        in_cluster,
477                        include_metadata,
478                        format,
479                        envelope,
480                        col_names,
481                        with_options,
482                        key_constraint,
483                        external_references,
484                        progress_subsource,
485                    });
486                    item.create_sql = stmt.to_ast_string_stable();
487                    tx.update_item(item.id, item)?;
488                    info!("migrate: converted mysql source {stmt} to remove subsource options");
489                }
490            }
491            // Multi-output load generator sources whose subsources are already
492            // migrated above. There is no need to remove any options from this
493            // statement since they are not export-specific.
494            Statement::CreateSource(CreateSourceStatement {
495                connection:
496                    CreateSourceConnection::LoadGenerator {
497                        generator:
498                            LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch,
499                        ..
500                    },
501                ..
502            }) => {}
503            // Single-output sources that need to be migrated to tables. These sources currently output
504            // data to the primary collection of the source statement. We will create a new table
505            // statement for them and move all export-specific options over from the source statement,
506            // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the
507            // new table statement.
508            Statement::CreateSource(CreateSourceStatement {
509                connection:
510                    conn @ (CreateSourceConnection::Kafka { .. }
511                    | CreateSourceConnection::LoadGenerator {
512                        generator:
513                            LoadGenerator::Clock
514                            | LoadGenerator::Datums
515                            | LoadGenerator::Counter
516                            | LoadGenerator::KeyValue,
517                        ..
518                    }),
519                name,
520                col_names,
521                include_metadata,
522                format,
523                envelope,
524                with_options,
525                if_not_exists,
526                in_cluster,
527                progress_subsource,
528                external_references,
529                key_constraint,
530            }) => {
531                // To check if this is a source that has already been migrated we use a basic
532                // heuristic: if there is at least one existing table for the source, and if
533                // the envelope/format/include_metadata options are empty, we assume it's
534                // already been migrated.
535                let tables_for_source =
536                    items_with_statements_copied
537                        .iter()
538                        .any(|(_, statement)| match statement {
539                            Statement::CreateTableFromSource(stmt) => {
540                                let source: CatalogItemId = match &stmt.source {
541                                    RawItemName::Name(_) => {
542                                        unreachable!("tables store source as ID")
543                                    }
544                                    RawItemName::Id(source_id, _, _) => {
545                                        source_id.parse().expect("valid id")
546                                    }
547                                };
548                                source == item.id
549                            }
550                            _ => false,
551                        });
552                if tables_for_source
553                    && envelope.is_none()
554                    && format.is_none()
555                    && include_metadata.is_empty()
556                {
557                    info!("migrate: skipping already migrated source: {}", name);
558                    continue;
559                }
560
561                // Use the current source name as the new table name, and rename the source to
562                // `<source_name>_source`. This is intended to allow users to continue using
563                // queries that reference the source name, since they will now need to query the
564                // table instead.
565
566                assert_eq!(
567                    item.name,
568                    name.0.last().expect("at least one ident").to_string()
569                );
570                // First find an unused name within the same schema to avoid conflicts.
571                let is_valid = |new_source_ident: &Ident| {
572                    if item_names_per_schema
573                        .get(&item.schema_id)
574                        .expect("schema must exist")
575                        .contains(&new_source_ident.to_string())
576                    {
577                        Ok::<_, IdentError>(false)
578                    } else {
579                        Ok(true)
580                    }
581                };
582                let new_source_ident =
583                    Ident::try_generate_name(item.name.clone(), "_source", is_valid)?;
584
585                // We will use the original item name for the new table item.
586                let table_item_name = item.name.clone();
587
588                // Update the source item/statement to use the new name.
589                let mut new_source_name = name.clone();
590                *new_source_name.0.last_mut().expect("at least one ident") =
591                    new_source_ident.clone();
592                item.name = new_source_ident.to_string();
593
594                // A reference to the source that will be included in the table statement
595                let source_ref =
596                    RawItemName::Id(item.id.to_string(), new_source_name.clone(), None);
597
598                let columns = if col_names.is_empty() {
599                    TableFromSourceColumns::NotSpecified
600                } else {
601                    TableFromSourceColumns::Named(col_names)
602                };
603
604                // All source tables must have a `details` option, which is a serialized proto
605                // describing any source-specific details for this table statement.
606                let details = match &conn {
607                    // For kafka sources this proto is currently empty.
608                    CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {},
609                    CreateSourceConnection::LoadGenerator { .. } => {
610                        // Since these load generators are single-output we use the default output.
611                        SourceExportStatementDetails::LoadGenerator {
612                            output: LoadGeneratorOutput::Default,
613                        }
614                    }
615                    _ => unreachable!("match determined above"),
616                };
617                let table_with_options = vec![TableFromSourceOption {
618                    name: TableFromSourceOptionName::Details,
619                    value: Some(WithOptionValue::Value(Value::String(hex::encode(
620                        details.into_proto().encode_to_vec(),
621                    )))),
622                }];
623
624                // Generate the same external-reference that would have been generated
625                // during purification for single-output sources.
626                let external_reference = match &conn {
627                    CreateSourceConnection::Kafka { options, .. } => {
628                        let topic_option = options
629                            .iter()
630                            .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
631                            .expect("kafka sources must have a topic");
632                        let topic = match &topic_option.value {
633                            Some(WithOptionValue::Value(Value::String(topic))) => topic,
634                            _ => unreachable!("topic must be a string"),
635                        };
636
637                        Some(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
638                    }
639                    CreateSourceConnection::LoadGenerator { generator, .. } => {
640                        // Since these load generators are single-output the external reference
641                        // uses the schema-name for both namespace and name.
642                        let name = FullItemName {
643                                database: mz_sql::names::RawDatabaseSpecifier::Name(
644                                    mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
645                                        .to_owned(),
646                                ),
647                                schema: generator.schema_name().to_string(),
648                                item: generator.schema_name().to_string(),
649                            };
650                        Some(UnresolvedItemName::from(name))
651                    }
652                    _ => unreachable!("match determined above"),
653                };
654
655                // The new table statement, stealing the name and the export-specific fields from
656                // the create source statement.
657                let table = CreateTableFromSourceStatement {
658                    name,
659                    constraints: vec![],
660                    columns,
661                    if_not_exists: false,
662                    source: source_ref,
663                    external_reference,
664                    with_options: table_with_options,
665                    envelope,
666                    include_metadata,
667                    format,
668                };
669
670                // The source statement with a new name and many of its fields emptied
671                let source = CreateSourceStatement {
672                    connection: conn,
673                    name: new_source_name,
674                    if_not_exists,
675                    in_cluster,
676                    include_metadata: vec![],
677                    format: None,
678                    envelope: None,
679                    col_names: vec![],
680                    with_options,
681                    key_constraint,
682                    external_references,
683                    progress_subsource,
684                };
685
686                let source_id = item.id;
687                let source_global_id = item.global_id;
688                let schema_id = item.schema_id.clone();
689                let schema = tx.get_schema(&item.schema_id).expect("schema must exist");
690
691                let owner_id = item.owner_id.clone();
692                let privileges = item.privileges.clone();
693                let extra_versions = item.extra_versions.clone();
694
695                // Update the source statement in the catalog first, since the name will
696                // otherwise conflict with the new table statement.
697                info!("migrate: updated source {} to {source}", item.create_sql);
698                item.create_sql = Statement::CreateSource(source).to_ast_string_stable();
699                tx.update_item(item.id, item)?;
700
701                // Insert the new table statement into the catalog with a new id.
702                let ids = tx.allocate_user_item_ids(1)?;
703                let (new_table_id, new_table_global_id) = ids[0];
704                info!("migrate: added table {new_table_id}: {table}");
705                tx.insert_user_item(
706                    new_table_id,
707                    new_table_global_id,
708                    schema_id,
709                    &table_item_name,
710                    table.to_ast_string_stable(),
711                    owner_id,
712                    privileges,
713                    &Default::default(),
714                    extra_versions,
715                )?;
716                // We need to move the shard currently attached to the source statement to the
717                // table statement such that the existing data in the shard is preserved and can
718                // be queried on the new table statement. However, we need to keep the GlobalId of
719                // the source the same, to preserve existing references to that statement in
720                // external tools such as DBT and Terraform. We will insert a new shard for the source
721                // statement which will be automatically created after the migration is complete.
722                let new_source_shard = ShardId::new();
723                let (source_global_id, existing_source_shard) = tx
724                    .delete_collection_metadata(btreeset! {source_global_id})
725                    .pop()
726                    .expect("shard should exist");
727                tx.insert_collection_metadata(btreemap! {
728                    new_table_global_id => existing_source_shard,
729                    source_global_id => new_source_shard
730                })?;
731
732                add_to_audit_log(
733                    tx,
734                    mz_audit_log::EventType::Create,
735                    mz_audit_log::ObjectType::Table,
736                    mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 {
737                        id: new_table_id.to_string(),
738                        name: mz_audit_log::FullNameV1 {
739                            database: schema
740                                .database_id
741                                .map(|d| d.to_string())
742                                .unwrap_or_default(),
743                            schema: schema.name,
744                            item: table_item_name,
745                        },
746                    }),
747                    now(),
748                )?;
749
750                // We also need to update any other statements that reference the source to use the new
751                // table id/name instead.
752                changed_ids.insert(source_id, new_table_id);
753            }
754
755            // TODO(sql_server2): Consider how to migrate SQL Server subsources
756            // to the source table world.
757            Statement::CreateSource(CreateSourceStatement {
758                connection: CreateSourceConnection::SqlServer { .. },
759                ..
760            }) => (),
761
762            #[expect(unreachable_patterns)]
763            Statement::CreateSource(_) => {}
764            _ => (),
765        }
766    }
767
768    let mut updated_items = BTreeMap::new();
769    for (mut item, mut statement) in items_with_statements_copied {
770        match &statement {
771            // Don’t rewrite any of the statements we just migrated.
772            Statement::CreateSource(_) => {}
773            Statement::CreateSubsource(_) => {}
774            Statement::CreateTableFromSource(_) => {}
775            // We need to rewrite any statements that reference a source id to use the new
776            // table id instead, since any contained data in the source will now be in the table.
777            // This assumes the table has stolen the source's name, which is the case
778            // for all sources that were migrated.
779            _ => {
780                if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) {
781                    info!("migrate: updated dependency reference in statement {statement}");
782                    item.create_sql = statement.to_ast_string_stable();
783                    updated_items.insert(item.id, item);
784                }
785            }
786        }
787    }
788    if !updated_items.is_empty() {
789        tx.update_items(updated_items)?;
790    }
791
792    Ok(())
793}
794
795// Durable migrations
796
797/// Migrations that run only on the durable catalog before any data is loaded into memory.
798pub(crate) fn durable_migrate(
799    tx: &mut Transaction,
800    _organization_id: Uuid,
801    _boot_ts: Timestamp,
802) -> Result<(), anyhow::Error> {
803    // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
804    // binary version instead of the deploy generation.
805    const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
806    const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
807    if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
808        if let Some(shard_id) = tx.get_expression_cache_shard() {
809            tx.mark_shards_as_finalized(btreeset! {shard_id});
810            tx.set_expression_cache_shard(ShardId::new())?;
811        }
812        tx.set_config(
813            EXPR_CACHE_MIGRATION_KEY.to_string(),
814            Some(EXPR_CACHE_MIGRATION_DONE),
815        )?;
816    }
817
818    // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
819    // binary version instead of the deploy generation.
820    const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
821    const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
822    if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
823        != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
824    {
825        if let Some(shard_id) = tx.get_builtin_migration_shard() {
826            tx.mark_shards_as_finalized(btreeset! {shard_id});
827            tx.set_builtin_migration_shard(ShardId::new())?;
828        }
829        tx.set_config(
830            BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
831            Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
832        )?;
833    }
834
835    if tx
836        .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
837        .is_none()
838    {
839        let mut nonce = [0u8; 24];
840        let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
841        let nonce = BASE64_STANDARD.encode(nonce);
842        tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
843    }
844
845    Ok(())
846}
847
848// Add new migrations below their appropriate heading, and precede them with a
849// short summary of the migration's purpose and optional additional commentary
850// about safety or approach.
851//
852// The convention is to name the migration function using snake case:
853// > <category>_<description>_<version>
854//
855// Please include the adapter team on any code reviews that add or edit
856// migrations.
857
858fn add_to_audit_log(
859    tx: &mut Transaction,
860    event_type: mz_audit_log::EventType,
861    object_type: mz_audit_log::ObjectType,
862    details: mz_audit_log::EventDetails,
863    occurred_at: mz_ore::now::EpochMillis,
864) -> Result<(), anyhow::Error> {
865    let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
866    let event =
867        mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
868    tx.insert_audit_log_event(event);
869    Ok(())
870}
871
872// Remove PARTITION STRATEGY from CREATE SINK statements.
873fn ast_rewrite_create_sink_partition_strategy(
874    stmt: &mut Statement<Raw>,
875) -> Result<(), anyhow::Error> {
876    let Statement::CreateSink(stmt) = stmt else {
877        return Ok(());
878    };
879    stmt.with_options
880        .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
881    Ok(())
882}