mz_adapter/catalog/
migrate.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24    CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25    SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40// DO NOT add any more imports from `crate` outside of `crate::catalog`.
41use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44
45/// Catalog key of the `migration_version` setting.
46///
47/// The `migration_version` tracks the version of the binary that last successfully performed and
48/// committed all the catalog migrations (including builtin schema migrations). It can be used by
49/// migration logic to identify the source version from which to migrate.
50///
51/// Note that the durable catalog also knows a `catalog_content_version`. That doesn't work for
52/// this purpose as it is already bumped to the current binary version when the catalog is opened
53/// in writable mode, before any migrations have run.
54const MIGRATION_VERSION_KEY: &str = "migration_version";
55
56pub(crate) fn get_migration_version(txn: &Transaction<'_>) -> Option<Version> {
57    txn.get_setting(MIGRATION_VERSION_KEY.into())
58        .map(|s| s.parse().expect("valid migration version"))
59}
60
61pub(crate) fn set_migration_version(
62    txn: &mut Transaction<'_>,
63    version: Version,
64) -> Result<(), mz_catalog::durable::CatalogError> {
65    txn.set_setting(MIGRATION_VERSION_KEY.into(), Some(version.to_string()))
66}
67
68fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
69where
70    F: for<'a> FnMut(
71        &'a mut Transaction<'_>,
72        CatalogItemId,
73        &'a mut Statement<Raw>,
74    ) -> Result<(), anyhow::Error>,
75{
76    let mut updated_items = BTreeMap::new();
77
78    for mut item in tx.get_items() {
79        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
80        f(tx, item.id, &mut stmt)?;
81
82        item.create_sql = stmt.to_ast_string_stable();
83
84        updated_items.insert(item.id, item);
85    }
86    tx.update_items(updated_items)?;
87    Ok(())
88}
89
90fn rewrite_items<F>(
91    tx: &mut Transaction<'_>,
92    cat: &ConnCatalog<'_>,
93    mut f: F,
94) -> Result<(), anyhow::Error>
95where
96    F: for<'a> FnMut(
97        &'a mut Transaction<'_>,
98        &'a &ConnCatalog<'_>,
99        CatalogItemId,
100        &'a mut Statement<Raw>,
101    ) -> Result<(), anyhow::Error>,
102{
103    let mut updated_items = BTreeMap::new();
104    let items = tx.get_items();
105    for mut item in items {
106        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
107
108        f(tx, &cat, item.id, &mut stmt)?;
109
110        item.create_sql = stmt.to_ast_string_stable();
111
112        updated_items.insert(item.id, item);
113    }
114    tx.update_items(updated_items)?;
115    Ok(())
116}
117
118pub(crate) struct MigrateResult {
119    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
120    pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
121}
122
123/// Migrates all user items and loads them into `state`.
124///
125/// Returns the builtin updates corresponding to all user items.
126pub(crate) async fn migrate(
127    state: &mut CatalogState,
128    tx: &mut Transaction<'_>,
129    local_expr_cache: &mut LocalExpressionCache,
130    item_updates: Vec<StateUpdate>,
131    _now: NowFn,
132    _boot_ts: Timestamp,
133) -> Result<MigrateResult, anyhow::Error> {
134    let catalog_version = get_migration_version(tx).unwrap_or(Version::new(0, 0, 0));
135
136    info!(
137        "migrating statements from catalog version {:?}",
138        catalog_version
139    );
140
141    rewrite_ast_items(tx, |_tx, _id, stmt| {
142        // Add per-item AST migrations below.
143        //
144        // Each migration should be a function that takes `stmt` (the AST
145        // representing the creation SQL for the item) as input. Any
146        // mutations to `stmt` will be staged for commit to the catalog.
147        //
148        // Migration functions may also take `tx` as input to stage
149        // arbitrary changes to the catalog.
150        ast_rewrite_create_sink_partition_strategy(stmt)?;
151        Ok(())
152    })?;
153
154    // Load items into catalog. We make sure to consolidate the old updates with the new updates to
155    // avoid trying to apply unmigrated items.
156    let commit_ts = tx.upper();
157    let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
158    let op_item_updates = tx.get_and_commit_op_updates();
159    let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
160    item_updates.extend(op_item_updates);
161    differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
162
163    // Since some migrations might introduce non-item 'post-item' updates, we sequester those
164    // so they can be applied with other post-item updates after migrations to avoid
165    // accumulating negative diffs.
166    let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
167        .into_iter()
168        // The only post-item update kind we currently generate is to
169        // update storage collection metadata.
170        .partition(|(kind, _, _)| {
171            matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
172        });
173
174    let item_updates = item_updates
175        .into_iter()
176        .map(|(kind, ts, diff)| StateUpdate {
177            kind: kind.into(),
178            ts,
179            diff: diff.try_into().expect("valid diff"),
180        })
181        .collect();
182
183    let force_source_table_syntax = state.system_config().force_source_table_syntax();
184    // When this flag is set the legacy syntax is denied. But here we are about to perform a
185    // migration which requires that we parse the current catalog state. To proceed we temporarily disable
186    // the flag and then reset it after migrations are done.
187    if force_source_table_syntax {
188        state
189            .system_config_mut()
190            .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
191            .expect("known parameter");
192    }
193
194    let mut ast_builtin_table_updates = state
195        .apply_updates_for_bootstrap(item_updates, local_expr_cache)
196        .await;
197
198    info!("migrating from catalog version {:?}", catalog_version);
199
200    let conn_cat = state.for_system_session();
201
202    // Special block for `ast_rewrite_sources_to_tables` migration
203    // since it requires a feature flag needs to update multiple AST items at once.
204    if force_source_table_syntax {
205        rewrite_sources_to_tables(tx, &conn_cat)?;
206    }
207
208    rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
209        let _catalog_version = catalog_version.clone();
210        // Add per-item, post-planning AST migrations below. Most
211        // migrations should be in the above `rewrite_ast_items` block.
212        //
213        // Each migration should be a function that takes `item` (the AST
214        // representing the creation SQL for the item) as input. Any
215        // mutations to `item` will be staged for commit to the catalog.
216        //
217        // Be careful if you reference `conn_cat`. Doing so is *weird*,
218        // as you'll be rewriting the catalog while looking at it. If
219        // possible, make your migration independent of `conn_cat`, and only
220        // consider a single item at a time.
221        //
222        // Migration functions may also take `tx` as input to stage
223        // arbitrary changes to the catalog.
224        Ok(())
225    })?;
226
227    if force_source_table_syntax {
228        state
229            .system_config_mut()
230            .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
231            .expect("known parameter");
232    }
233
234    // Add whole-catalog migrations below.
235    //
236    // Each migration should be a function that takes `tx` and `conn_cat` as
237    // input and stages arbitrary transformations to the catalog on `tx`.
238
239    let op_item_updates = tx.get_and_commit_op_updates();
240    let item_builtin_table_updates = state
241        .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
242        .await;
243
244    ast_builtin_table_updates.extend(item_builtin_table_updates);
245
246    info!(
247        "migration from catalog version {:?} complete",
248        catalog_version
249    );
250    Ok(MigrateResult {
251        builtin_table_updates: ast_builtin_table_updates,
252        post_item_updates,
253    })
254}
255
256// Add new migrations below their appropriate heading, and precede them with a
257// short summary of the migration's purpose and optional additional commentary
258// about safety or approach.
259//
260// The convention is to name the migration function using snake case:
261// > <category>_<description>_<version>
262//
263// Please include the adapter team on any code reviews that add or edit
264// migrations.
265
266/// Migrates all sources to use the new sources as tables model
267///
268/// Suppose we have an old-style source named `source_name` with global id `source_id`. The source
269/// will also have an associated progress source named `progress_name` (which is almost always
270/// `source_name` + "_progress") with global id `progress_id`.
271///
272/// We have two constraints to satisfy. The migration:
273///   1. should not change the schema of a global id *if that global id maps to a
274///      durable collection*. The reason for this constraint is that when a durable collection (i.e
275///      backed by a persist shard) is opened persist will verify that the schema is the expected
276///      one. If we change the Create SQL of a global id to a non-durable definition (e.g a view)
277///      then we are free to also change the schema.
278///   2. should make it such that the SQL object that is constructed with a new-style `CREATE
279///      SOURCE` statement contains the progress data and all other objects related to the
280///      old-style source depend on that object.
281///
282/// With these constraints we consider two cases.
283///
284/// ## Case 1: A multi-output source
285///
286/// Multi-output sources have a dummy output as the contents of `source_name` that is useless. So
287/// we re-purpose that name to be the `CREATE SOURCE` statement and make `progress_name` be a view
288/// of `source_name`. Since the main source is a durable object we must move `source_name` and the
289/// corresponding new-style `CREATE SOURCE` statement under `progress_id`. Then `progress_name` can
290/// move to `source_id` and since it becomes a view we are free to change its schema.
291///
292/// Visually, we are changing this mapping:
293///
294/// |  Global ID  |  SQL Name     | Create SQL                 | Schema   | Durable |
295/// +-------------+---------------+----------------------------+----------+---------|
296/// | source_id   | source_name   | CREATE SOURCE (old-style)  | empty    | yes     |
297/// | progress_id | progress_name | CREATE SUBSOURCE .."       | progress | yes     |
298///
299/// to this mapping:
300///
301/// |  Global ID  |  SQL Name     | Create SQL                | Schema        | Durable |
302/// +-------------+---------------+---------------------------+---------------+---------+
303/// | source_id   | progress_name | CREATE VIEW               | progress data | no      |
304/// | progress_id | source_name   | CREATE SOURCE (new-style) | progress data | yes     |
305///
306/// ## Case 2: A single-output source
307///
308/// Single-output sources have data as the contents of `source_name` and so we can't repurpose that
309/// name to be the `CREATE SOURCE` statement. Here we leave everything intact except for the
310/// Create SQL of each object. Namely, the old-style `CREATE SOURCE` statement becomes a `CREATE
311/// TABLE FROM SOURCE` and the old-style `CREATE SUBSOURCE .. PROGRESS` becomes a new-style `CREATE
312/// SOURCE` statement.
313///
314/// Visually, we are changing this mapping:
315///
316/// |  Global ID  |  SQL Name     | Create SQL                 | Schema      | Durable |
317/// +-------------+---------------+----------------------------+-------------+---------|
318/// | source_id   | source_name   | CREATE SOURCE (old-style)  | source data | yes     |
319/// | progress_id | progress_name | CREATE SUBSOURCE .."       | progress    | yes     |
320///
321/// to this mapping:
322///
323/// |  Global ID  |  SQL Name     | Create SQL                 | Schema      | Durable |
324/// +-------------+---------------+----------------------------+-------------+---------|
325/// | source_id   | source_name   | CREATE TABLE FROM SOURCE   | source data | yes     |
326/// | progress_id | progress_name | CREATE SOURCE (new-style)  | progress    | yes     |
327///
328/// ## Subsource migration
329///
330/// After the migration goes over all the `CREATE SOURCE` statements it then transforms each
331/// non-progress `CREATE SUBSOURCE` statement to be a `CREATE TABLE FROM SOURCE` statement that
332/// points to the original `source_name` but with the altered global id (which is now
333/// `progress_id`).
334fn rewrite_sources_to_tables(
335    tx: &mut Transaction<'_>,
336    catalog: &ConnCatalog<'_>,
337) -> Result<(), anyhow::Error> {
338    use mz_sql::ast::{
339        CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
340        CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
341        KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
342        RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
343        UnresolvedItemName, Value, WithOptionValue,
344    };
345
346    let mut updated_items = BTreeMap::new();
347
348    let mut sources = vec![];
349    let mut subsources = vec![];
350
351    for item in tx.get_items() {
352        let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
353        match stmt {
354            Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
355            Statement::CreateSource(stmt) => sources.push((item, stmt)),
356            _ => {}
357        }
358    }
359
360    let mut pending_progress_items = BTreeMap::new();
361    let mut migrated_source_ids = BTreeMap::new();
362    // We first go over the sources, which depending on the kind determine what happens with the
363    // progress statements.
364    for (mut source_item, source_stmt) in sources {
365        let CreateSourceStatement {
366            name,
367            in_cluster,
368            col_names,
369            mut connection,
370            include_metadata,
371            format,
372            envelope,
373            if_not_exists,
374            key_constraint,
375            with_options,
376            external_references,
377            progress_subsource,
378        } = source_stmt;
379
380        let (progress_name, progress_item) = match progress_subsource {
381            Some(DeferredItemName::Named(RawItemName::Name(name))) => {
382                let partial_name = normalize::unresolved_item_name(name.clone())?;
383                (name, catalog.resolve_item(&partial_name)?)
384            }
385            Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
386                let gid = id.parse()?;
387                (name, catalog.get_item(&gid))
388            }
389            Some(DeferredItemName::Deferred(_)) => {
390                unreachable!("invalid progress subsource")
391            }
392            None => {
393                info!("migrate: skipping already migrated source: {name}");
394                continue;
395            }
396        };
397        let raw_progress_name =
398            RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
399
400        // We need to jump through some hoops to get to the raw item name of the source
401        let catalog_item = catalog.get_item(&source_item.id);
402        let source_name: &QualifiedItemName = catalog_item.name();
403        let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
404        let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
405
406        // First, strip the connection options that we no longer need
407        match &mut connection {
408            CreateSourceConnection::Postgres { options, .. } => {
409                options.retain(|o| match o.name {
410                    PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
411                    PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
412                });
413            }
414            CreateSourceConnection::SqlServer { options, .. } => {
415                options.retain(|o| match o.name {
416                    SqlServerConfigOptionName::Details => true,
417                    SqlServerConfigOptionName::TextColumns
418                    | SqlServerConfigOptionName::ExcludeColumns => false,
419                });
420            }
421            CreateSourceConnection::MySql { options, .. } => {
422                options.retain(|o| match o.name {
423                    MySqlConfigOptionName::Details => true,
424                    MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
425                        false
426                    }
427                });
428            }
429            CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
430            }
431        }
432
433        // Then, figure out the new statements for the progress and source.
434        let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
435            match connection {
436                connection @ (CreateSourceConnection::Postgres { .. }
437                | CreateSourceConnection::MySql { .. }
438                | CreateSourceConnection::SqlServer { .. }
439                | CreateSourceConnection::LoadGenerator {
440                    generator:
441                        LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
442                    ..
443                }) => {
444                    // Assert the expected state of the source
445                    assert_eq!(col_names, &[]);
446                    assert_eq!(key_constraint, None);
447                    assert_eq!(format, None);
448                    assert_eq!(envelope, None);
449                    assert_eq!(include_metadata, &[]);
450                    assert_eq!(external_references, None);
451
452                    // This is a dummy replacement statement for the source object of multi-output
453                    // sources. It is describing the query `TABLE source_name`. This ensures that
454                    // whoever was used to run select queries against the `source_name` + "_progress"
455                    // object still gets the same data after the migration. This switch does
456                    // changes the schema of the object with `source_item.id` but because we're turning
457                    // it into a view, which is not durable, it's ok. We'll never open a persist shard
458                    // for this global id anymore.
459                    let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
460                        if_exists: IfExistsBehavior::Error,
461                        temporary: false,
462                        definition: ViewDefinition {
463                            name: progress_name,
464                            columns: vec![],
465                            query: Query {
466                                ctes: CteBlock::Simple(vec![]),
467                                body: SetExpr::Table(RawItemName::Id(
468                                    progress_item.id().to_string(),
469                                    source_name.clone(),
470                                    None,
471                                )),
472                                order_by: vec![],
473                                limit: None,
474                                offset: None,
475                            },
476                        },
477                    });
478
479                    let new_progress_stmt = CreateSourceStatement {
480                        name: source_name.clone(),
481                        in_cluster,
482                        col_names: vec![],
483                        connection,
484                        include_metadata: vec![],
485                        format: None,
486                        envelope: None,
487                        if_not_exists,
488                        key_constraint: None,
489                        with_options,
490                        external_references: None,
491                        progress_subsource: None,
492                    };
493
494                    migrated_source_ids.insert(source_item.id, progress_item.id());
495
496                    (
497                        full_source_name.item,
498                        new_progress_stmt,
499                        progress_item.name().item.clone(),
500                        dummy_source_stmt,
501                    )
502                }
503                CreateSourceConnection::Kafka {
504                    options,
505                    connection,
506                } => {
507                    let constraints = if let Some(_key_constraint) = key_constraint {
508                        // Primary key not enforced is not enabled for anyone
509                        // TODO: remove the feature altogether
510                        vec![]
511                    } else {
512                        vec![]
513                    };
514
515                    let columns = if col_names.is_empty() {
516                        TableFromSourceColumns::NotSpecified
517                    } else {
518                        TableFromSourceColumns::Named(col_names)
519                    };
520
521                    // All source tables must have a `details` option, which is a serialized proto
522                    // describing any source-specific details for this table statement.
523                    let details = SourceExportStatementDetails::Kafka {};
524                    let table_with_options = vec![TableFromSourceOption {
525                        name: TableFromSourceOptionName::Details,
526                        value: Some(WithOptionValue::Value(Value::String(hex::encode(
527                            details.into_proto().encode_to_vec(),
528                        )))),
529                    }];
530                    // The external reference for a kafka source is the just the topic name
531                    let topic_option = options
532                        .iter()
533                        .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
534                        .expect("kafka sources must have a topic");
535                    let topic = match &topic_option.value {
536                        Some(WithOptionValue::Value(Value::String(topic))) => topic,
537                        _ => unreachable!("topic must be a string"),
538                    };
539                    let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
540
541                    let new_source_stmt =
542                        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
543                            name: source_name,
544                            constraints,
545                            columns,
546                            if_not_exists,
547                            source: raw_progress_name,
548                            include_metadata,
549                            format,
550                            envelope,
551                            external_reference: Some(external_reference),
552                            with_options: table_with_options,
553                        });
554
555                    let new_progress_stmt = CreateSourceStatement {
556                        name: progress_name,
557                        in_cluster,
558                        col_names: vec![],
559                        connection: CreateSourceConnection::Kafka {
560                            options,
561                            connection,
562                        },
563                        include_metadata: vec![],
564                        format: None,
565                        envelope: None,
566                        if_not_exists,
567                        key_constraint: None,
568                        with_options,
569                        external_references: None,
570                        progress_subsource: None,
571                    };
572                    (
573                        progress_item.name().item.clone(),
574                        new_progress_stmt,
575                        full_source_name.item,
576                        new_source_stmt,
577                    )
578                }
579                CreateSourceConnection::LoadGenerator {
580                    generator:
581                        generator @ (LoadGenerator::Clock
582                        | LoadGenerator::Counter
583                        | LoadGenerator::Datums
584                        | LoadGenerator::KeyValue),
585                    options,
586                } => {
587                    let constraints = if let Some(_key_constraint) = key_constraint {
588                        // Should we ignore not enforced primary key constraints here?
589                        vec![]
590                    } else {
591                        vec![]
592                    };
593
594                    let columns = if col_names.is_empty() {
595                        TableFromSourceColumns::NotSpecified
596                    } else {
597                        TableFromSourceColumns::Named(col_names)
598                    };
599
600                    // All source tables must have a `details` option, which is a serialized proto
601                    // describing any source-specific details for this table statement.
602                    let details = SourceExportStatementDetails::LoadGenerator {
603                        output: LoadGeneratorOutput::Default,
604                    };
605                    let table_with_options = vec![TableFromSourceOption {
606                        name: TableFromSourceOptionName::Details,
607                        value: Some(WithOptionValue::Value(Value::String(hex::encode(
608                            details.into_proto().encode_to_vec(),
609                        )))),
610                    }];
611                    // Since these load generators are single-output the external reference
612                    // uses the schema-name for both namespace and name.
613                    let external_reference = FullItemName {
614                        database: mz_sql::names::RawDatabaseSpecifier::Name(
615                            mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
616                                .to_owned(),
617                        ),
618                        schema: generator.schema_name().to_string(),
619                        item: generator.schema_name().to_string(),
620                    };
621
622                    let new_source_stmt =
623                        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
624                            name: source_name,
625                            constraints,
626                            columns,
627                            if_not_exists,
628                            source: raw_progress_name,
629                            include_metadata,
630                            format,
631                            envelope,
632                            external_reference: Some(external_reference.into()),
633                            with_options: table_with_options,
634                        });
635
636                    let new_progress_stmt = CreateSourceStatement {
637                        name: progress_name,
638                        in_cluster,
639                        col_names: vec![],
640                        connection: CreateSourceConnection::LoadGenerator { generator, options },
641                        include_metadata: vec![],
642                        format: None,
643                        envelope: None,
644                        if_not_exists,
645                        key_constraint: None,
646                        with_options,
647                        external_references: None,
648                        progress_subsource: None,
649                    };
650                    (
651                        progress_item.name().item.clone(),
652                        new_progress_stmt,
653                        full_source_name.item,
654                        new_source_stmt,
655                    )
656                }
657            };
658
659        // The source can be updated right away but the replacement progress statement will
660        // be installed in the next loop where we go over subsources.
661
662        info!(
663            "migrate: converted source {} to {}",
664            source_item.create_sql, new_source_stmt
665        );
666        source_item.name = new_source_name.clone();
667        source_item.create_sql = new_source_stmt.to_ast_string_stable();
668        updated_items.insert(source_item.id, source_item);
669        pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
670    }
671
672    for (mut item, stmt) in subsources {
673        match stmt {
674            // Migrate progress statements to the corresponding statement produced from the
675            // previous step.
676            CreateSubsourceStatement {
677                of_source: None, ..
678            } => {
679                let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
680                    panic!("encountered orphan progress subsource id: {}", item.id)
681                };
682                item.name = new_name;
683                item.create_sql = new_stmt.to_ast_string_stable();
684                updated_items.insert(item.id, item);
685            }
686            // Migrate each `CREATE SUBSOURCE` statement to an equivalent
687            // `CREATE TABLE ... FROM SOURCE` statement.
688            CreateSubsourceStatement {
689                name,
690                columns,
691                constraints,
692                of_source: Some(raw_source_name),
693                if_not_exists,
694                mut with_options,
695            } => {
696                let new_raw_source_name = match raw_source_name {
697                    RawItemName::Id(old_id, name, None) => {
698                        let old_id: CatalogItemId = old_id.parse().expect("well formed");
699                        let new_id = migrated_source_ids[&old_id].clone();
700                        RawItemName::Id(new_id.to_string(), name, None)
701                    }
702                    _ => unreachable!("unexpected source name: {raw_source_name}"),
703                };
704                // The external reference is a `with_option` on subsource statements but is a
705                // separate field on table statements.
706                let external_reference = match with_options
707                    .iter()
708                    .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
709                {
710                    Some(i) => match with_options.remove(i).value {
711                        Some(WithOptionValue::UnresolvedItemName(name)) => name,
712                        _ => unreachable!("external reference must be an unresolved item name"),
713                    },
714                    None => panic!("subsource must have an external reference"),
715                };
716
717                let with_options = with_options
718                    .into_iter()
719                    .map(|option| {
720                        match option.name {
721                            CreateSubsourceOptionName::Details => TableFromSourceOption {
722                                name: TableFromSourceOptionName::Details,
723                                // The `details` option on both subsources and tables is identical, using the same
724                                // ProtoSourceExportStatementDetails serialized value.
725                                value: option.value,
726                            },
727                            CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
728                                name: TableFromSourceOptionName::TextColumns,
729                                value: option.value,
730                            },
731                            CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
732                                name: TableFromSourceOptionName::ExcludeColumns,
733                                value: option.value,
734                            },
735                            CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
736                                name: TableFromSourceOptionName::RetainHistory,
737                                value: option.value,
738                            },
739                            CreateSubsourceOptionName::Progress => {
740                                panic!("progress option should not exist on this subsource")
741                            }
742                            CreateSubsourceOptionName::ExternalReference => {
743                                unreachable!("This option is handled separately above.")
744                            }
745                        }
746                    })
747                    .collect::<Vec<_>>();
748
749                let table = CreateTableFromSourceStatement {
750                    name,
751                    constraints,
752                    columns: TableFromSourceColumns::Defined(columns),
753                    if_not_exists,
754                    source: new_raw_source_name,
755                    external_reference: Some(external_reference),
756                    with_options,
757                    // Subsources don't have `envelope`, `include_metadata`, or `format` options.
758                    envelope: None,
759                    include_metadata: vec![],
760                    format: None,
761                };
762
763                info!(
764                    "migrate: converted subsource {} to table {}",
765                    item.create_sql, table
766                );
767                item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
768                updated_items.insert(item.id, item);
769            }
770        }
771    }
772    assert!(
773        pending_progress_items.is_empty(),
774        "unexpected residual progress items: {pending_progress_items:?}"
775    );
776
777    tx.update_items(updated_items)?;
778
779    Ok(())
780}
781
782// Durable migrations
783
784/// Migrations that run only on the durable catalog before any data is loaded into memory.
785pub(crate) fn durable_migrate(
786    tx: &mut Transaction,
787    _organization_id: Uuid,
788    _boot_ts: Timestamp,
789) -> Result<(), anyhow::Error> {
790    // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
791    // binary version instead of the deploy generation.
792    const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
793    const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
794    if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
795        if let Some(shard_id) = tx.get_expression_cache_shard() {
796            tx.mark_shards_as_finalized(btreeset! {shard_id});
797            tx.set_expression_cache_shard(ShardId::new())?;
798        }
799        tx.set_config(
800            EXPR_CACHE_MIGRATION_KEY.to_string(),
801            Some(EXPR_CACHE_MIGRATION_DONE),
802        )?;
803    }
804
805    // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
806    // binary version instead of the deploy generation.
807    const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
808    const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
809    if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
810        != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
811    {
812        if let Some(shard_id) = tx.get_builtin_migration_shard() {
813            tx.mark_shards_as_finalized(btreeset! {shard_id});
814            tx.set_builtin_migration_shard(ShardId::new())?;
815        }
816        tx.set_config(
817            BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
818            Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
819        )?;
820    }
821
822    if tx
823        .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
824        .is_none()
825    {
826        let mut nonce = [0u8; 24];
827        let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
828        let nonce = BASE64_STANDARD.encode(nonce);
829        tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
830    }
831
832    Ok(())
833}
834
835// Add new migrations below their appropriate heading, and precede them with a
836// short summary of the migration's purpose and optional additional commentary
837// about safety or approach.
838//
839// The convention is to name the migration function using snake case:
840// > <category>_<description>_<version>
841//
842// Please include the adapter team on any code reviews that add or edit
843// migrations.
844
845// Remove PARTITION STRATEGY from CREATE SINK statements.
846fn ast_rewrite_create_sink_partition_strategy(
847    stmt: &mut Statement<Raw>,
848) -> Result<(), anyhow::Error> {
849    let Statement::CreateSink(stmt) = stmt else {
850        return Ok(());
851    };
852    stmt.with_options
853        .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
854    Ok(())
855}