mz_adapter/catalog/
migrate.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24    CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25    SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40// DO NOT add any more imports from `crate` outside of `crate::catalog`.
41use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44
45fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
46where
47    F: for<'a> FnMut(
48        &'a mut Transaction<'_>,
49        CatalogItemId,
50        &'a mut Statement<Raw>,
51    ) -> Result<(), anyhow::Error>,
52{
53    let mut updated_items = BTreeMap::new();
54
55    for mut item in tx.get_items() {
56        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
57        f(tx, item.id, &mut stmt)?;
58
59        item.create_sql = stmt.to_ast_string_stable();
60
61        updated_items.insert(item.id, item);
62    }
63    tx.update_items(updated_items)?;
64    Ok(())
65}
66
67fn rewrite_items<F>(
68    tx: &mut Transaction<'_>,
69    cat: &ConnCatalog<'_>,
70    mut f: F,
71) -> Result<(), anyhow::Error>
72where
73    F: for<'a> FnMut(
74        &'a mut Transaction<'_>,
75        &'a &ConnCatalog<'_>,
76        CatalogItemId,
77        &'a mut Statement<Raw>,
78    ) -> Result<(), anyhow::Error>,
79{
80    let mut updated_items = BTreeMap::new();
81    let items = tx.get_items();
82    for mut item in items {
83        let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
84
85        f(tx, &cat, item.id, &mut stmt)?;
86
87        item.create_sql = stmt.to_ast_string_stable();
88
89        updated_items.insert(item.id, item);
90    }
91    tx.update_items(updated_items)?;
92    Ok(())
93}
94
95pub(crate) struct MigrateResult {
96    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
97    pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
98}
99
100/// Migrates all user items and loads them into `state`.
101///
102/// Returns the builtin updates corresponding to all user items.
103pub(crate) async fn migrate(
104    state: &mut CatalogState,
105    tx: &mut Transaction<'_>,
106    local_expr_cache: &mut LocalExpressionCache,
107    item_updates: Vec<StateUpdate>,
108    _now: NowFn,
109    _boot_ts: Timestamp,
110) -> Result<MigrateResult, anyhow::Error> {
111    let catalog_version = tx.get_catalog_content_version();
112    let catalog_version = match catalog_version {
113        Some(v) => Version::parse(v)?,
114        None => Version::new(0, 0, 0),
115    };
116
117    info!(
118        "migrating statements from catalog version {:?}",
119        catalog_version
120    );
121
122    rewrite_ast_items(tx, |_tx, _id, stmt| {
123        // Add per-item AST migrations below.
124        //
125        // Each migration should be a function that takes `stmt` (the AST
126        // representing the creation SQL for the item) as input. Any
127        // mutations to `stmt` will be staged for commit to the catalog.
128        //
129        // Migration functions may also take `tx` as input to stage
130        // arbitrary changes to the catalog.
131        ast_rewrite_create_sink_partition_strategy(stmt)?;
132        Ok(())
133    })?;
134
135    // Load items into catalog. We make sure to consolidate the old updates with the new updates to
136    // avoid trying to apply unmigrated items.
137    let commit_ts = tx.upper();
138    let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
139    let op_item_updates = tx.get_and_commit_op_updates();
140    let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
141    item_updates.extend(op_item_updates);
142    differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
143
144    // Since some migrations might introduce non-item 'post-item' updates, we sequester those
145    // so they can be applied with other post-item updates after migrations to avoid
146    // accumulating negative diffs.
147    let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
148        .into_iter()
149        // The only post-item update kind we currently generate is to
150        // update storage collection metadata.
151        .partition(|(kind, _, _)| {
152            matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
153        });
154
155    let item_updates = item_updates
156        .into_iter()
157        .map(|(kind, ts, diff)| StateUpdate {
158            kind: kind.into(),
159            ts,
160            diff: diff.try_into().expect("valid diff"),
161        })
162        .collect();
163
164    let force_source_table_syntax = state.system_config().force_source_table_syntax();
165    // When this flag is set the legacy syntax is denied. But here we are about to perform a
166    // migration which requires that we parse the current catalog state. To proceed we temporarily disable
167    // the flag and then reset it after migrations are done.
168    if force_source_table_syntax {
169        state
170            .system_config_mut()
171            .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
172            .expect("known parameter");
173    }
174
175    let mut ast_builtin_table_updates = state
176        .apply_updates_for_bootstrap(item_updates, local_expr_cache)
177        .await;
178
179    info!("migrating from catalog version {:?}", catalog_version);
180
181    let conn_cat = state.for_system_session();
182
183    // Special block for `ast_rewrite_sources_to_tables` migration
184    // since it requires a feature flag needs to update multiple AST items at once.
185    if force_source_table_syntax {
186        rewrite_sources_to_tables(tx, &conn_cat)?;
187    }
188
189    rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
190        let _catalog_version = catalog_version.clone();
191        // Add per-item, post-planning AST migrations below. Most
192        // migrations should be in the above `rewrite_ast_items` block.
193        //
194        // Each migration should be a function that takes `item` (the AST
195        // representing the creation SQL for the item) as input. Any
196        // mutations to `item` will be staged for commit to the catalog.
197        //
198        // Be careful if you reference `conn_cat`. Doing so is *weird*,
199        // as you'll be rewriting the catalog while looking at it. If
200        // possible, make your migration independent of `conn_cat`, and only
201        // consider a single item at a time.
202        //
203        // Migration functions may also take `tx` as input to stage
204        // arbitrary changes to the catalog.
205        Ok(())
206    })?;
207
208    if force_source_table_syntax {
209        state
210            .system_config_mut()
211            .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
212            .expect("known parameter");
213    }
214
215    // Add whole-catalog migrations below.
216    //
217    // Each migration should be a function that takes `tx` and `conn_cat` as
218    // input and stages arbitrary transformations to the catalog on `tx`.
219
220    let op_item_updates = tx.get_and_commit_op_updates();
221    let item_builtin_table_updates = state
222        .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
223        .await;
224
225    ast_builtin_table_updates.extend(item_builtin_table_updates);
226
227    info!(
228        "migration from catalog version {:?} complete",
229        catalog_version
230    );
231    Ok(MigrateResult {
232        builtin_table_updates: ast_builtin_table_updates,
233        post_item_updates,
234    })
235}
236
237// Add new migrations below their appropriate heading, and precede them with a
238// short summary of the migration's purpose and optional additional commentary
239// about safety or approach.
240//
241// The convention is to name the migration function using snake case:
242// > <category>_<description>_<version>
243//
244// Please include the adapter team on any code reviews that add or edit
245// migrations.
246
247/// Migrates all sources to use the new sources as tables model
248///
249/// Suppose we have an old-style source named `source_name` with global id `source_id`. The source
250/// will also have an associated progress source named `progress_name` (which is almost always
251/// `source_name` + "_progress") with global id `progress_id`.
252///
253/// We have two constraints to satisfy. The migration:
254///   1. should not change the schema of a global id *if that global id maps to a
255///      durable collection*. The reason for this constraint is that when a durable collection (i.e
256///      backed by a persist shard) is opened persist will verify that the schema is the expected
257///      one. If we change the Create SQL of a global id to a non-durable definition (e.g a view)
258///      then we are free to also change the schema.
259///   2. should make it such that the SQL object that is constructed with a new-style `CREATE
260///      SOURCE` statement contains the progress data and all other objects related to the
261///      old-style source depend on that object.
262///
263/// With these constraints we consider two cases.
264///
265/// ## Case 1: A multi-output source
266///
267/// Multi-output sources have a dummy output as the contents of `source_name` that is useless. So
268/// we re-purpose that name to be the `CREATE SOURCE` statement and make `progress_name` be a view
269/// of `source_name`. Since the main source is a durable object we must move `source_name` and the
270/// corresponding new-style `CREATE SOURCE` statement under `progress_id`. Then `progress_name` can
271/// move to `source_id` and since it becomes a view we are free to change its schema.
272///
273/// Visually, we are changing this mapping:
274///
275/// |  Global ID  |  SQL Name     | Create SQL                 | Schema   | Durable |
276/// +-------------+---------------+----------------------------+----------+---------|
277/// | source_id   | source_name   | CREATE SOURCE (old-style)  | empty    | yes     |
278/// | progress_id | progress_name | CREATE SUBSOURCE .."       | progress | yes     |
279///
280/// to this mapping:
281///
282/// |  Global ID  |  SQL Name     | Create SQL                | Schema        | Durable |
283/// +-------------+---------------+---------------------------+---------------+---------+
284/// | source_id   | progress_name | CREATE VIEW               | progress data | no      |
285/// | progress_id | source_name   | CREATE SOURCE (new-style) | progress data | yes     |
286///
287/// ## Case 2: A single-output source
288///
289/// Single-output sources have data as the contents of `source_name` and so we can't repurpose that
290/// name to be the `CREATE SOURCE` statement. Here we leave everything intact except for the
291/// Create SQL of each object. Namely, the old-style `CREATE SOURCE` statement becomes a `CREATE
292/// TABLE FROM SOURCE` and the old-style `CREATE SUBSOURCE .. PROGRESS` becomes a new-style `CREATE
293/// SOURCE` statement.
294///
295/// Visually, we are changing this mapping:
296///
297/// |  Global ID  |  SQL Name     | Create SQL                 | Schema      | Durable |
298/// +-------------+---------------+----------------------------+-------------+---------|
299/// | source_id   | source_name   | CREATE SOURCE (old-style)  | source data | yes     |
300/// | progress_id | progress_name | CREATE SUBSOURCE .."       | progress    | yes     |
301///
302/// to this mapping:
303///
304/// |  Global ID  |  SQL Name     | Create SQL                 | Schema      | Durable |
305/// +-------------+---------------+----------------------------+-------------+---------|
306/// | source_id   | source_name   | CREATE TABLE FROM SOURCE   | source data | yes     |
307/// | progress_id | progress_name | CREATE SOURCE (new-style)  | progress    | yes     |
308///
309/// ## Subsource migration
310///
311/// After the migration goes over all the `CREATE SOURCE` statements it then transforms each
312/// non-progress `CREATE SUBSOURCE` statement to be a `CREATE TABLE FROM SOURCE` statement that
313/// points to the original `source_name` but with the altered global id (which is now
314/// `progress_id`).
315fn rewrite_sources_to_tables(
316    tx: &mut Transaction<'_>,
317    catalog: &ConnCatalog<'_>,
318) -> Result<(), anyhow::Error> {
319    use mz_sql::ast::{
320        CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
321        CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
322        KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
323        RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
324        UnresolvedItemName, Value, WithOptionValue,
325    };
326
327    let mut updated_items = BTreeMap::new();
328
329    let mut sources = vec![];
330    let mut subsources = vec![];
331
332    for item in tx.get_items() {
333        let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
334        match stmt {
335            Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
336            Statement::CreateSource(stmt) => sources.push((item, stmt)),
337            _ => {}
338        }
339    }
340
341    let mut pending_progress_items = BTreeMap::new();
342    let mut migrated_source_ids = BTreeMap::new();
343    // We first go over the sources, which depending on the kind determine what happens with the
344    // progress statements.
345    for (mut source_item, source_stmt) in sources {
346        let CreateSourceStatement {
347            name,
348            in_cluster,
349            col_names,
350            mut connection,
351            include_metadata,
352            format,
353            envelope,
354            if_not_exists,
355            key_constraint,
356            with_options,
357            external_references,
358            progress_subsource,
359        } = source_stmt;
360
361        let (progress_name, progress_item) = match progress_subsource {
362            Some(DeferredItemName::Named(RawItemName::Name(name))) => {
363                let partial_name = normalize::unresolved_item_name(name.clone())?;
364                (name, catalog.resolve_item(&partial_name)?)
365            }
366            Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
367                let gid = id.parse()?;
368                (name, catalog.get_item(&gid))
369            }
370            Some(DeferredItemName::Deferred(_)) => {
371                unreachable!("invalid progress subsource")
372            }
373            None => {
374                info!("migrate: skipping already migrated source: {name}");
375                continue;
376            }
377        };
378        let raw_progress_name =
379            RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
380
381        // We need to jump through some hoops to get to the raw item name of the source
382        let catalog_item = catalog.get_item(&source_item.id);
383        let source_name: &QualifiedItemName = catalog_item.name();
384        let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
385        let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
386
387        // First, strip the connection options that we no longer need
388        match &mut connection {
389            CreateSourceConnection::Postgres { options, .. } => {
390                options.retain(|o| match o.name {
391                    PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
392                    PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
393                });
394            }
395            CreateSourceConnection::SqlServer { options, .. } => {
396                options.retain(|o| match o.name {
397                    SqlServerConfigOptionName::Details => true,
398                    SqlServerConfigOptionName::TextColumns
399                    | SqlServerConfigOptionName::ExcludeColumns => false,
400                });
401            }
402            CreateSourceConnection::MySql { options, .. } => {
403                options.retain(|o| match o.name {
404                    MySqlConfigOptionName::Details => true,
405                    MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
406                        false
407                    }
408                });
409            }
410            CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
411            }
412        }
413
414        // Then, figure out the new statements for the progress and source.
415        let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
416            match connection {
417                connection @ (CreateSourceConnection::Postgres { .. }
418                | CreateSourceConnection::MySql { .. }
419                | CreateSourceConnection::SqlServer { .. }
420                | CreateSourceConnection::LoadGenerator {
421                    generator:
422                        LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
423                    ..
424                }) => {
425                    // Assert the expected state of the source
426                    assert_eq!(col_names, &[]);
427                    assert_eq!(key_constraint, None);
428                    assert_eq!(format, None);
429                    assert_eq!(envelope, None);
430                    assert_eq!(include_metadata, &[]);
431                    assert_eq!(external_references, None);
432
433                    // This is a dummy replacement statement for the source object of multi-output
434                    // sources. It is describing the query `TABLE source_name`. This ensures that
435                    // whoever was used to run select queries against the `source_name` + "_progress"
436                    // object still gets the same data after the migration. This switch does
437                    // changes the schema of the object with `source_item.id` but because we're turning
438                    // it into a view, which is not durable, it's ok. We'll never open a persist shard
439                    // for this global id anymore.
440                    let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
441                        if_exists: IfExistsBehavior::Error,
442                        temporary: false,
443                        definition: ViewDefinition {
444                            name: progress_name,
445                            columns: vec![],
446                            query: Query {
447                                ctes: CteBlock::Simple(vec![]),
448                                body: SetExpr::Table(RawItemName::Id(
449                                    progress_item.id().to_string(),
450                                    source_name.clone(),
451                                    None,
452                                )),
453                                order_by: vec![],
454                                limit: None,
455                                offset: None,
456                            },
457                        },
458                    });
459
460                    let new_progress_stmt = CreateSourceStatement {
461                        name: source_name.clone(),
462                        in_cluster,
463                        col_names: vec![],
464                        connection,
465                        include_metadata: vec![],
466                        format: None,
467                        envelope: None,
468                        if_not_exists,
469                        key_constraint: None,
470                        with_options,
471                        external_references: None,
472                        progress_subsource: None,
473                    };
474
475                    migrated_source_ids.insert(source_item.id, progress_item.id());
476
477                    (
478                        full_source_name.item,
479                        new_progress_stmt,
480                        progress_item.name().item.clone(),
481                        dummy_source_stmt,
482                    )
483                }
484                CreateSourceConnection::Kafka {
485                    options,
486                    connection,
487                } => {
488                    let constraints = if let Some(_key_constraint) = key_constraint {
489                        // Primary key not enforced is not enabled for anyone
490                        // TODO: remove the feature altogether
491                        vec![]
492                    } else {
493                        vec![]
494                    };
495
496                    let columns = if col_names.is_empty() {
497                        TableFromSourceColumns::NotSpecified
498                    } else {
499                        TableFromSourceColumns::Named(col_names)
500                    };
501
502                    // All source tables must have a `details` option, which is a serialized proto
503                    // describing any source-specific details for this table statement.
504                    let details = SourceExportStatementDetails::Kafka {};
505                    let table_with_options = vec![TableFromSourceOption {
506                        name: TableFromSourceOptionName::Details,
507                        value: Some(WithOptionValue::Value(Value::String(hex::encode(
508                            details.into_proto().encode_to_vec(),
509                        )))),
510                    }];
511                    // The external reference for a kafka source is the just the topic name
512                    let topic_option = options
513                        .iter()
514                        .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
515                        .expect("kafka sources must have a topic");
516                    let topic = match &topic_option.value {
517                        Some(WithOptionValue::Value(Value::String(topic))) => topic,
518                        _ => unreachable!("topic must be a string"),
519                    };
520                    let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
521
522                    let new_source_stmt =
523                        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
524                            name: source_name,
525                            constraints,
526                            columns,
527                            if_not_exists,
528                            source: raw_progress_name,
529                            include_metadata,
530                            format,
531                            envelope,
532                            external_reference: Some(external_reference),
533                            with_options: table_with_options,
534                        });
535
536                    let new_progress_stmt = CreateSourceStatement {
537                        name: progress_name,
538                        in_cluster,
539                        col_names: vec![],
540                        connection: CreateSourceConnection::Kafka {
541                            options,
542                            connection,
543                        },
544                        include_metadata: vec![],
545                        format: None,
546                        envelope: None,
547                        if_not_exists,
548                        key_constraint: None,
549                        with_options,
550                        external_references: None,
551                        progress_subsource: None,
552                    };
553                    (
554                        progress_item.name().item.clone(),
555                        new_progress_stmt,
556                        full_source_name.item,
557                        new_source_stmt,
558                    )
559                }
560                CreateSourceConnection::LoadGenerator {
561                    generator:
562                        generator @ (LoadGenerator::Clock
563                        | LoadGenerator::Counter
564                        | LoadGenerator::Datums
565                        | LoadGenerator::KeyValue),
566                    options,
567                } => {
568                    let constraints = if let Some(_key_constraint) = key_constraint {
569                        // Should we ignore not enforced primary key constraints here?
570                        vec![]
571                    } else {
572                        vec![]
573                    };
574
575                    let columns = if col_names.is_empty() {
576                        TableFromSourceColumns::NotSpecified
577                    } else {
578                        TableFromSourceColumns::Named(col_names)
579                    };
580
581                    // All source tables must have a `details` option, which is a serialized proto
582                    // describing any source-specific details for this table statement.
583                    let details = SourceExportStatementDetails::LoadGenerator {
584                        output: LoadGeneratorOutput::Default,
585                    };
586                    let table_with_options = vec![TableFromSourceOption {
587                        name: TableFromSourceOptionName::Details,
588                        value: Some(WithOptionValue::Value(Value::String(hex::encode(
589                            details.into_proto().encode_to_vec(),
590                        )))),
591                    }];
592                    // Since these load generators are single-output the external reference
593                    // uses the schema-name for both namespace and name.
594                    let external_reference = FullItemName {
595                        database: mz_sql::names::RawDatabaseSpecifier::Name(
596                            mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
597                                .to_owned(),
598                        ),
599                        schema: generator.schema_name().to_string(),
600                        item: generator.schema_name().to_string(),
601                    };
602
603                    let new_source_stmt =
604                        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
605                            name: source_name,
606                            constraints,
607                            columns,
608                            if_not_exists,
609                            source: raw_progress_name,
610                            include_metadata,
611                            format,
612                            envelope,
613                            external_reference: Some(external_reference.into()),
614                            with_options: table_with_options,
615                        });
616
617                    let new_progress_stmt = CreateSourceStatement {
618                        name: progress_name,
619                        in_cluster,
620                        col_names: vec![],
621                        connection: CreateSourceConnection::LoadGenerator { generator, options },
622                        include_metadata: vec![],
623                        format: None,
624                        envelope: None,
625                        if_not_exists,
626                        key_constraint: None,
627                        with_options,
628                        external_references: None,
629                        progress_subsource: None,
630                    };
631                    (
632                        progress_item.name().item.clone(),
633                        new_progress_stmt,
634                        full_source_name.item,
635                        new_source_stmt,
636                    )
637                }
638            };
639
640        // The source can be updated right away but the replacement progress statement will
641        // be installed in the next loop where we go over subsources.
642
643        info!(
644            "migrate: converted source {} to {}",
645            source_item.create_sql, new_source_stmt
646        );
647        source_item.name = new_source_name.clone();
648        source_item.create_sql = new_source_stmt.to_ast_string_stable();
649        updated_items.insert(source_item.id, source_item);
650        pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
651    }
652
653    for (mut item, stmt) in subsources {
654        match stmt {
655            // Migrate progress statements to the corresponding statement produced from the
656            // previous step.
657            CreateSubsourceStatement {
658                of_source: None, ..
659            } => {
660                let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
661                    panic!("encountered orphan progress subsource id: {}", item.id)
662                };
663                item.name = new_name;
664                item.create_sql = new_stmt.to_ast_string_stable();
665                updated_items.insert(item.id, item);
666            }
667            // Migrate each `CREATE SUBSOURCE` statement to an equivalent
668            // `CREATE TABLE ... FROM SOURCE` statement.
669            CreateSubsourceStatement {
670                name,
671                columns,
672                constraints,
673                of_source: Some(raw_source_name),
674                if_not_exists,
675                mut with_options,
676            } => {
677                let new_raw_source_name = match raw_source_name {
678                    RawItemName::Id(old_id, name, None) => {
679                        let old_id: CatalogItemId = old_id.parse().expect("well formed");
680                        let new_id = migrated_source_ids[&old_id].clone();
681                        RawItemName::Id(new_id.to_string(), name, None)
682                    }
683                    _ => unreachable!("unexpected source name: {raw_source_name}"),
684                };
685                // The external reference is a `with_option` on subsource statements but is a
686                // separate field on table statements.
687                let external_reference = match with_options
688                    .iter()
689                    .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
690                {
691                    Some(i) => match with_options.remove(i).value {
692                        Some(WithOptionValue::UnresolvedItemName(name)) => name,
693                        _ => unreachable!("external reference must be an unresolved item name"),
694                    },
695                    None => panic!("subsource must have an external reference"),
696                };
697
698                let with_options = with_options
699                    .into_iter()
700                    .map(|option| {
701                        match option.name {
702                            CreateSubsourceOptionName::Details => TableFromSourceOption {
703                                name: TableFromSourceOptionName::Details,
704                                // The `details` option on both subsources and tables is identical, using the same
705                                // ProtoSourceExportStatementDetails serialized value.
706                                value: option.value,
707                            },
708                            CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
709                                name: TableFromSourceOptionName::TextColumns,
710                                value: option.value,
711                            },
712                            CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
713                                name: TableFromSourceOptionName::ExcludeColumns,
714                                value: option.value,
715                            },
716                            CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
717                                name: TableFromSourceOptionName::RetainHistory,
718                                value: option.value,
719                            },
720                            CreateSubsourceOptionName::Progress => {
721                                panic!("progress option should not exist on this subsource")
722                            }
723                            CreateSubsourceOptionName::ExternalReference => {
724                                unreachable!("This option is handled separately above.")
725                            }
726                        }
727                    })
728                    .collect::<Vec<_>>();
729
730                let table = CreateTableFromSourceStatement {
731                    name,
732                    constraints,
733                    columns: TableFromSourceColumns::Defined(columns),
734                    if_not_exists,
735                    source: new_raw_source_name,
736                    external_reference: Some(external_reference),
737                    with_options,
738                    // Subsources don't have `envelope`, `include_metadata`, or `format` options.
739                    envelope: None,
740                    include_metadata: vec![],
741                    format: None,
742                };
743
744                info!(
745                    "migrate: converted subsource {} to table {}",
746                    item.create_sql, table
747                );
748                item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
749                updated_items.insert(item.id, item);
750            }
751        }
752    }
753    assert!(
754        pending_progress_items.is_empty(),
755        "unexpected residual progress items: {pending_progress_items:?}"
756    );
757
758    tx.update_items(updated_items)?;
759
760    Ok(())
761}
762
763// Durable migrations
764
765/// Migrations that run only on the durable catalog before any data is loaded into memory.
766pub(crate) fn durable_migrate(
767    tx: &mut Transaction,
768    _organization_id: Uuid,
769    _boot_ts: Timestamp,
770) -> Result<(), anyhow::Error> {
771    // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
772    // binary version instead of the deploy generation.
773    const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
774    const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
775    if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
776        if let Some(shard_id) = tx.get_expression_cache_shard() {
777            tx.mark_shards_as_finalized(btreeset! {shard_id});
778            tx.set_expression_cache_shard(ShardId::new())?;
779        }
780        tx.set_config(
781            EXPR_CACHE_MIGRATION_KEY.to_string(),
782            Some(EXPR_CACHE_MIGRATION_DONE),
783        )?;
784    }
785
786    // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
787    // binary version instead of the deploy generation.
788    const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
789    const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
790    if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
791        != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
792    {
793        if let Some(shard_id) = tx.get_builtin_migration_shard() {
794            tx.mark_shards_as_finalized(btreeset! {shard_id});
795            tx.set_builtin_migration_shard(ShardId::new())?;
796        }
797        tx.set_config(
798            BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
799            Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
800        )?;
801    }
802
803    if tx
804        .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
805        .is_none()
806    {
807        let mut nonce = [0u8; 24];
808        let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
809        let nonce = BASE64_STANDARD.encode(nonce);
810        tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
811    }
812
813    Ok(())
814}
815
816// Add new migrations below their appropriate heading, and precede them with a
817// short summary of the migration's purpose and optional additional commentary
818// about safety or approach.
819//
820// The convention is to name the migration function using snake case:
821// > <category>_<description>_<version>
822//
823// Please include the adapter team on any code reviews that add or edit
824// migrations.
825
826// Remove PARTITION STRATEGY from CREATE SINK statements.
827fn ast_rewrite_create_sink_partition_strategy(
828    stmt: &mut Statement<Raw>,
829) -> Result<(), anyhow::Error> {
830    let Statement::CreateSink(stmt) = stmt else {
831        return Ok(());
832    };
833    stmt.with_options
834        .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
835    Ok(())
836}