mz_adapter/catalog/migrate.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreeset;
13use mz_catalog::builtin::BuiltinTable;
14use mz_catalog::durable::Transaction;
15use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
16use mz_ore::collections::CollectionExt;
17use mz_ore::now::NowFn;
18use mz_persist_types::ShardId;
19use mz_repr::{CatalogItemId, Diff, Timestamp};
20use mz_sql::ast::CreateSinkOptionName;
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::names::FullItemName;
23use mz_sql_parser::ast::{IdentError, Raw, Statement};
24use mz_storage_client::controller::StorageTxn;
25use semver::Version;
26use tracing::info;
27use uuid::Uuid;
28
29// DO NOT add any more imports from `crate` outside of `crate::catalog`.
30use crate::catalog::open::into_consolidatable_updates_startup;
31use crate::catalog::state::LocalExpressionCache;
32use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
33
34fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
35where
36 F: for<'a> FnMut(
37 &'a mut Transaction<'_>,
38 CatalogItemId,
39 &'a mut Statement<Raw>,
40 ) -> Result<(), anyhow::Error>,
41{
42 let mut updated_items = BTreeMap::new();
43
44 for mut item in tx.get_items() {
45 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
46 f(tx, item.id, &mut stmt)?;
47
48 item.create_sql = stmt.to_ast_string_stable();
49
50 updated_items.insert(item.id, item);
51 }
52 tx.update_items(updated_items)?;
53 Ok(())
54}
55
56fn rewrite_items<F>(
57 tx: &mut Transaction<'_>,
58 cat: &ConnCatalog<'_>,
59 mut f: F,
60) -> Result<(), anyhow::Error>
61where
62 F: for<'a> FnMut(
63 &'a mut Transaction<'_>,
64 &'a &ConnCatalog<'_>,
65 CatalogItemId,
66 &'a mut Statement<Raw>,
67 ) -> Result<(), anyhow::Error>,
68{
69 let mut updated_items = BTreeMap::new();
70 let items = tx.get_items();
71 for mut item in items {
72 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
73
74 f(tx, &cat, item.id, &mut stmt)?;
75
76 item.create_sql = stmt.to_ast_string_stable();
77
78 updated_items.insert(item.id, item);
79 }
80 tx.update_items(updated_items)?;
81 Ok(())
82}
83
84pub(crate) struct MigrateResult {
85 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
86 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
87}
88
89/// Migrates all user items and loads them into `state`.
90///
91/// Returns the builtin updates corresponding to all user items.
92pub(crate) async fn migrate(
93 state: &mut CatalogState,
94 tx: &mut Transaction<'_>,
95 local_expr_cache: &mut LocalExpressionCache,
96 item_updates: Vec<StateUpdate>,
97 now: NowFn,
98 _boot_ts: Timestamp,
99) -> Result<MigrateResult, anyhow::Error> {
100 let catalog_version = tx.get_catalog_content_version();
101 let catalog_version = match catalog_version {
102 Some(v) => Version::parse(v)?,
103 None => Version::new(0, 0, 0),
104 };
105
106 info!(
107 "migrating statements from catalog version {:?}",
108 catalog_version
109 );
110
111 // Special block for `ast_rewrite_sources_to_tables` migration
112 // since it requires a feature flag needs to update multiple AST items at once.
113 if state.system_config().force_source_table_syntax() {
114 ast_rewrite_sources_to_tables(tx, now)?;
115 }
116
117 rewrite_ast_items(tx, |_tx, _id, stmt| {
118 // Add per-item AST migrations below.
119 //
120 // Each migration should be a function that takes `stmt` (the AST
121 // representing the creation SQL for the item) as input. Any
122 // mutations to `stmt` will be staged for commit to the catalog.
123 //
124 // Migration functions may also take `tx` as input to stage
125 // arbitrary changes to the catalog.
126 ast_rewrite_create_sink_partition_strategy(stmt)?;
127 Ok(())
128 })?;
129
130 // Load items into catalog. We make sure to consolidate the old updates with the new updates to
131 // avoid trying to apply unmigrated items.
132 let commit_ts = tx.upper();
133 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
134 let op_item_updates = tx.get_and_commit_op_updates();
135 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
136 item_updates.extend(op_item_updates);
137 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
138
139 // Since some migrations might introduce non-item 'post-item' updates, we sequester those
140 // so they can be applied with other post-item updates after migrations to avoid
141 // accumulating negative diffs.
142 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
143 .into_iter()
144 // The only post-item update kind we currently generate is to
145 // update storage collection metadata.
146 .partition(|(kind, _, _)| {
147 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
148 });
149
150 let item_updates = item_updates
151 .into_iter()
152 .map(|(kind, ts, diff)| StateUpdate {
153 kind: kind.into(),
154 ts,
155 diff: diff.try_into().expect("valid diff"),
156 })
157 .collect();
158 let mut ast_builtin_table_updates = state
159 .apply_updates_for_bootstrap(item_updates, local_expr_cache)
160 .await;
161
162 info!("migrating from catalog version {:?}", catalog_version);
163
164 let conn_cat = state.for_system_session();
165
166 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
167 let _catalog_version = catalog_version.clone();
168 // Add per-item, post-planning AST migrations below. Most
169 // migrations should be in the above `rewrite_ast_items` block.
170 //
171 // Each migration should be a function that takes `item` (the AST
172 // representing the creation SQL for the item) as input. Any
173 // mutations to `item` will be staged for commit to the catalog.
174 //
175 // Be careful if you reference `conn_cat`. Doing so is *weird*,
176 // as you'll be rewriting the catalog while looking at it. If
177 // possible, make your migration independent of `conn_cat`, and only
178 // consider a single item at a time.
179 //
180 // Migration functions may also take `tx` as input to stage
181 // arbitrary changes to the catalog.
182 Ok(())
183 })?;
184
185 // Add whole-catalog migrations below.
186 //
187 // Each migration should be a function that takes `tx` and `conn_cat` as
188 // input and stages arbitrary transformations to the catalog on `tx`.
189
190 let op_item_updates = tx.get_and_commit_op_updates();
191 let item_builtin_table_updates = state
192 .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
193 .await;
194
195 ast_builtin_table_updates.extend(item_builtin_table_updates);
196
197 info!(
198 "migration from catalog version {:?} complete",
199 catalog_version
200 );
201 Ok(MigrateResult {
202 builtin_table_updates: ast_builtin_table_updates,
203 post_item_updates,
204 })
205}
206
207// Add new migrations below their appropriate heading, and precede them with a
208// short summary of the migration's purpose and optional additional commentary
209// about safety or approach.
210//
211// The convention is to name the migration function using snake case:
212// > <category>_<description>_<version>
213//
214// Please include the adapter team on any code reviews that add or edit
215// migrations.
216
217/// Migrates all sources to use the new sources as tables model
218///
219/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into
220/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres,
221/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources.
222///
223/// Second we migrate existing `CREATE SOURCE` statements for these multi-output
224/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS).
225///
226/// Third we migrate existing single-output `CREATE SOURCE` statements.
227/// This includes existing Kafka and single-output load-generator
228/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE`
229/// statement that copies over all the export-specific options. This table will use
230/// to the existing source statement's persist shard but use a new GlobalID.
231/// The original source statement will be updated to remove the export-specific options,
232/// renamed to `<original_name>_source`, and use a new empty shard while keeping its
233/// same GlobalId.
234///
235fn ast_rewrite_sources_to_tables(
236 tx: &mut Transaction<'_>,
237 now: NowFn,
238) -> Result<(), anyhow::Error> {
239 use maplit::btreemap;
240 use maplit::btreeset;
241 use mz_persist_types::ShardId;
242 use mz_proto::RustType;
243 use mz_sql::ast::{
244 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
245 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
246 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
247 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
248 UnresolvedItemName, Value, WithOptionValue,
249 };
250 use mz_storage_client::controller::StorageTxn;
251 use mz_storage_types::sources::SourceExportStatementDetails;
252 use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
253 use prost::Message;
254
255 let items_with_statements = tx
256 .get_items()
257 .map(|item| {
258 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
259 Ok((item, stmt))
260 })
261 .collect::<Result<Vec<_>, anyhow::Error>>()?;
262 let items_with_statements_copied = items_with_statements.clone();
263
264 let item_names_per_schema = items_with_statements_copied
265 .iter()
266 .map(|(item, _)| (item.schema_id.clone(), &item.name))
267 .fold(BTreeMap::new(), |mut acc, (schema_id, name)| {
268 acc.entry(schema_id)
269 .or_insert_with(|| btreeset! {})
270 .insert(name);
271 acc
272 });
273
274 // Any CatalogItemId that should be changed to a new CatalogItemId in any statements that
275 // reference it. This is necessary for ensuring downstream statements (e.g.
276 // mat views, indexes) that reference a single-output source (e.g. kafka)
277 // will now reference the corresponding new table, with the same data, instead.
278 let mut changed_ids = BTreeMap::new();
279
280 for (mut item, stmt) in items_with_statements {
281 match stmt {
282 // Migrate each `CREATE SUBSOURCE` statement to an equivalent
283 // `CREATE TABLE ... FROM SOURCE` statement.
284 Statement::CreateSubsource(CreateSubsourceStatement {
285 name,
286 columns,
287 constraints,
288 of_source,
289 if_not_exists,
290 mut with_options,
291 }) => {
292 let raw_source_name = match of_source {
293 // If `of_source` is None then this is a `progress` subsource which we
294 // are not migrating as they are not currently relevant to the new table model.
295 None => continue,
296 Some(name) => name,
297 };
298 let source = match raw_source_name {
299 // Some legacy subsources have named-only references to their `of_source`
300 // so we ensure we always use an ID-based reference in the stored
301 // `CREATE TABLE ... FROM SOURCE` statements.
302 RawItemName::Name(name) => {
303 // Convert the name reference to an ID reference.
304 let (source_item, _) = items_with_statements_copied
305 .iter()
306 .find(|(_, statement)| match statement {
307 Statement::CreateSource(stmt) => stmt.name == name,
308 _ => false,
309 })
310 .expect("source must exist");
311 RawItemName::Id(source_item.id.to_string(), name, None)
312 }
313 RawItemName::Id(..) => raw_source_name,
314 };
315
316 // The external reference is a `with_option` on subsource statements but is a
317 // separate field on table statements.
318 let external_reference = match with_options
319 .iter()
320 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
321 {
322 Some(i) => match with_options.remove(i).value {
323 Some(WithOptionValue::UnresolvedItemName(name)) => name,
324 _ => unreachable!("external reference must be an unresolved item name"),
325 },
326 None => panic!("subsource must have an external reference"),
327 };
328
329 let with_options = with_options
330 .into_iter()
331 .map(|option| {
332 match option.name {
333 CreateSubsourceOptionName::Details => TableFromSourceOption {
334 name: TableFromSourceOptionName::Details,
335 // The `details` option on both subsources and tables is identical, using the same
336 // ProtoSourceExportStatementDetails serialized value.
337 value: option.value,
338 },
339 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
340 name: TableFromSourceOptionName::TextColumns,
341 value: option.value,
342 },
343 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
344 name: TableFromSourceOptionName::ExcludeColumns,
345 value: option.value,
346 },
347 CreateSubsourceOptionName::Progress => {
348 panic!("progress option should not exist on this subsource")
349 }
350 CreateSubsourceOptionName::ExternalReference => {
351 unreachable!("This option is handled separately above.")
352 }
353 }
354 })
355 .collect::<Vec<_>>();
356
357 let table = CreateTableFromSourceStatement {
358 name,
359 constraints,
360 columns: mz_sql::ast::TableFromSourceColumns::Defined(columns),
361 if_not_exists,
362 source,
363 external_reference: Some(external_reference.clone()),
364 with_options,
365 // Subsources don't have `envelope`, `include_metadata`, or `format` options.
366 envelope: None,
367 include_metadata: vec![],
368 format: None,
369 };
370
371 info!(
372 "migrate: converted subsource {} to table {}",
373 item.create_sql, table
374 );
375 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
376 tx.update_item(item.id, item)?;
377 }
378
379 // Postgres sources are multi-output sources whose subsources are
380 // migrated above. All we need to do is remove the subsource-related
381 // options from this statement since they are no longer relevant.
382 Statement::CreateSource(CreateSourceStatement {
383 connection:
384 mut conn @ (CreateSourceConnection::Postgres { .. }
385 | CreateSourceConnection::Yugabyte { .. }),
386 name,
387 if_not_exists,
388 in_cluster,
389 include_metadata,
390 format,
391 envelope,
392 col_names,
393 with_options,
394 key_constraint,
395 external_references,
396 progress_subsource,
397 }) => {
398 let options = match &mut conn {
399 CreateSourceConnection::Postgres { options, .. } => options,
400 CreateSourceConnection::Yugabyte { options, .. } => options,
401 _ => unreachable!("match determined above"),
402 };
403 // This option storing text columns on the primary source statement is redundant
404 // with the option on subsource statements so can just be removed.
405 // This was kept for round-tripping of `CREATE SOURCE` statements that automatically
406 // generated subsources, which is no longer necessary.
407 if options
408 .iter()
409 .any(|o| matches!(o.name, PgConfigOptionName::TextColumns))
410 {
411 options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
412 let stmt = Statement::CreateSource(CreateSourceStatement {
413 connection: conn,
414 name,
415 if_not_exists,
416 in_cluster,
417 include_metadata,
418 format,
419 envelope,
420 col_names,
421 with_options,
422 key_constraint,
423 external_references,
424 progress_subsource,
425 });
426 item.create_sql = stmt.to_ast_string_stable();
427 tx.update_item(item.id, item)?;
428 info!("migrate: converted postgres source {stmt} to remove subsource options");
429 }
430 }
431 // MySQL sources are multi-output sources whose subsources are
432 // migrated above. All we need to do is remove the subsource-related
433 // options from this statement since they are no longer relevant.
434 Statement::CreateSource(CreateSourceStatement {
435 connection: mut conn @ CreateSourceConnection::MySql { .. },
436 name,
437 if_not_exists,
438 in_cluster,
439 include_metadata,
440 format,
441 envelope,
442 col_names,
443 with_options,
444 key_constraint,
445 external_references,
446 progress_subsource,
447 ..
448 }) => {
449 let options = match &mut conn {
450 CreateSourceConnection::MySql { options, .. } => options,
451 _ => unreachable!("match determined above"),
452 };
453 // These options storing text and exclude columns on the primary source statement
454 // are redundant with the options on subsource statements so can just be removed.
455 // They was kept for round-tripping of `CREATE SOURCE` statements that automatically
456 // generated subsources, which is no longer necessary.
457 if options.iter().any(|o| {
458 matches!(
459 o.name,
460 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns
461 )
462 }) {
463 options.retain(|o| {
464 !matches!(
465 o.name,
466 MySqlConfigOptionName::TextColumns
467 | MySqlConfigOptionName::ExcludeColumns
468 )
469 });
470 let stmt = Statement::CreateSource(CreateSourceStatement {
471 connection: conn,
472 name,
473 if_not_exists,
474 in_cluster,
475 include_metadata,
476 format,
477 envelope,
478 col_names,
479 with_options,
480 key_constraint,
481 external_references,
482 progress_subsource,
483 });
484 item.create_sql = stmt.to_ast_string_stable();
485 tx.update_item(item.id, item)?;
486 info!("migrate: converted mysql source {stmt} to remove subsource options");
487 }
488 }
489 // Multi-output load generator sources whose subsources are already
490 // migrated above. There is no need to remove any options from this
491 // statement since they are not export-specific.
492 Statement::CreateSource(CreateSourceStatement {
493 connection:
494 CreateSourceConnection::LoadGenerator {
495 generator:
496 LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch,
497 ..
498 },
499 ..
500 }) => {}
501 // Single-output sources that need to be migrated to tables. These sources currently output
502 // data to the primary collection of the source statement. We will create a new table
503 // statement for them and move all export-specific options over from the source statement,
504 // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the
505 // new table statement.
506 Statement::CreateSource(CreateSourceStatement {
507 connection:
508 conn @ (CreateSourceConnection::Kafka { .. }
509 | CreateSourceConnection::LoadGenerator {
510 generator:
511 LoadGenerator::Clock
512 | LoadGenerator::Datums
513 | LoadGenerator::Counter
514 | LoadGenerator::KeyValue,
515 ..
516 }),
517 name,
518 col_names,
519 include_metadata,
520 format,
521 envelope,
522 with_options,
523 if_not_exists,
524 in_cluster,
525 progress_subsource,
526 external_references,
527 key_constraint,
528 }) => {
529 // To check if this is a source that has already been migrated we use a basic
530 // heuristic: if there is at least one existing table for the source, and if
531 // the envelope/format/include_metadata options are empty, we assume it's
532 // already been migrated.
533 let tables_for_source =
534 items_with_statements_copied
535 .iter()
536 .any(|(_, statement)| match statement {
537 Statement::CreateTableFromSource(stmt) => {
538 let source: CatalogItemId = match &stmt.source {
539 RawItemName::Name(_) => {
540 unreachable!("tables store source as ID")
541 }
542 RawItemName::Id(source_id, _, _) => {
543 source_id.parse().expect("valid id")
544 }
545 };
546 source == item.id
547 }
548 _ => false,
549 });
550 if tables_for_source
551 && envelope.is_none()
552 && format.is_none()
553 && include_metadata.is_empty()
554 {
555 info!("migrate: skipping already migrated source: {}", name);
556 continue;
557 }
558
559 // Use the current source name as the new table name, and rename the source to
560 // `<source_name>_source`. This is intended to allow users to continue using
561 // queries that reference the source name, since they will now need to query the
562 // table instead.
563
564 assert_eq!(
565 item.name,
566 name.0.last().expect("at least one ident").to_string()
567 );
568 // First find an unused name within the same schema to avoid conflicts.
569 let is_valid = |new_source_ident: &Ident| {
570 if item_names_per_schema
571 .get(&item.schema_id)
572 .expect("schema must exist")
573 .contains(&new_source_ident.to_string())
574 {
575 Ok::<_, IdentError>(false)
576 } else {
577 Ok(true)
578 }
579 };
580 let new_source_ident =
581 Ident::try_generate_name(item.name.clone(), "_source", is_valid)?;
582
583 // We will use the original item name for the new table item.
584 let table_item_name = item.name.clone();
585
586 // Update the source item/statement to use the new name.
587 let mut new_source_name = name.clone();
588 *new_source_name.0.last_mut().expect("at least one ident") =
589 new_source_ident.clone();
590 item.name = new_source_ident.to_string();
591
592 // A reference to the source that will be included in the table statement
593 let source_ref =
594 RawItemName::Id(item.id.to_string(), new_source_name.clone(), None);
595
596 let columns = if col_names.is_empty() {
597 TableFromSourceColumns::NotSpecified
598 } else {
599 TableFromSourceColumns::Named(col_names)
600 };
601
602 // All source tables must have a `details` option, which is a serialized proto
603 // describing any source-specific details for this table statement.
604 let details = match &conn {
605 // For kafka sources this proto is currently empty.
606 CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {},
607 CreateSourceConnection::LoadGenerator { .. } => {
608 // Since these load generators are single-output we use the default output.
609 SourceExportStatementDetails::LoadGenerator {
610 output: LoadGeneratorOutput::Default,
611 }
612 }
613 _ => unreachable!("match determined above"),
614 };
615 let table_with_options = vec![TableFromSourceOption {
616 name: TableFromSourceOptionName::Details,
617 value: Some(WithOptionValue::Value(Value::String(hex::encode(
618 details.into_proto().encode_to_vec(),
619 )))),
620 }];
621
622 // Generate the same external-reference that would have been generated
623 // during purification for single-output sources.
624 let external_reference = match &conn {
625 CreateSourceConnection::Kafka { options, .. } => {
626 let topic_option = options
627 .iter()
628 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
629 .expect("kafka sources must have a topic");
630 let topic = match &topic_option.value {
631 Some(WithOptionValue::Value(Value::String(topic))) => topic,
632 _ => unreachable!("topic must be a string"),
633 };
634
635 Some(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
636 }
637 CreateSourceConnection::LoadGenerator { generator, .. } => {
638 // Since these load generators are single-output the external reference
639 // uses the schema-name for both namespace and name.
640 let name = FullItemName {
641 database: mz_sql::names::RawDatabaseSpecifier::Name(
642 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
643 .to_owned(),
644 ),
645 schema: generator.schema_name().to_string(),
646 item: generator.schema_name().to_string(),
647 };
648 Some(UnresolvedItemName::from(name))
649 }
650 _ => unreachable!("match determined above"),
651 };
652
653 // The new table statement, stealing the name and the export-specific fields from
654 // the create source statement.
655 let table = CreateTableFromSourceStatement {
656 name,
657 constraints: vec![],
658 columns,
659 if_not_exists: false,
660 source: source_ref,
661 external_reference,
662 with_options: table_with_options,
663 envelope,
664 include_metadata,
665 format,
666 };
667
668 // The source statement with a new name and many of its fields emptied
669 let source = CreateSourceStatement {
670 connection: conn,
671 name: new_source_name,
672 if_not_exists,
673 in_cluster,
674 include_metadata: vec![],
675 format: None,
676 envelope: None,
677 col_names: vec![],
678 with_options,
679 key_constraint,
680 external_references,
681 progress_subsource,
682 };
683
684 let source_id = item.id;
685 let source_global_id = item.global_id;
686 let schema_id = item.schema_id.clone();
687 let schema = tx.get_schema(&item.schema_id).expect("schema must exist");
688
689 let owner_id = item.owner_id.clone();
690 let privileges = item.privileges.clone();
691 let extra_versions = item.extra_versions.clone();
692
693 // Update the source statement in the catalog first, since the name will
694 // otherwise conflict with the new table statement.
695 info!("migrate: updated source {} to {source}", item.create_sql);
696 item.create_sql = Statement::CreateSource(source).to_ast_string_stable();
697 tx.update_item(item.id, item)?;
698
699 // Insert the new table statement into the catalog with a new id.
700 let ids = tx.allocate_user_item_ids(1)?;
701 let (new_table_id, new_table_global_id) = ids[0];
702 info!("migrate: added table {new_table_id}: {table}");
703 tx.insert_user_item(
704 new_table_id,
705 new_table_global_id,
706 schema_id,
707 &table_item_name,
708 table.to_ast_string_stable(),
709 owner_id,
710 privileges,
711 &Default::default(),
712 extra_versions,
713 )?;
714 // We need to move the shard currently attached to the source statement to the
715 // table statement such that the existing data in the shard is preserved and can
716 // be queried on the new table statement. However, we need to keep the GlobalId of
717 // the source the same, to preserve existing references to that statement in
718 // external tools such as DBT and Terraform. We will insert a new shard for the source
719 // statement which will be automatically created after the migration is complete.
720 let new_source_shard = ShardId::new();
721 let (source_global_id, existing_source_shard) = tx
722 .delete_collection_metadata(btreeset! {source_global_id})
723 .pop()
724 .expect("shard should exist");
725 tx.insert_collection_metadata(btreemap! {
726 new_table_global_id => existing_source_shard,
727 source_global_id => new_source_shard
728 })?;
729
730 add_to_audit_log(
731 tx,
732 mz_audit_log::EventType::Create,
733 mz_audit_log::ObjectType::Table,
734 mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 {
735 id: new_table_id.to_string(),
736 name: mz_audit_log::FullNameV1 {
737 database: schema
738 .database_id
739 .map(|d| d.to_string())
740 .unwrap_or_default(),
741 schema: schema.name,
742 item: table_item_name,
743 },
744 }),
745 now(),
746 )?;
747
748 // We also need to update any other statements that reference the source to use the new
749 // table id/name instead.
750 changed_ids.insert(source_id, new_table_id);
751 }
752
753 // TODO(sql_server1): Consider how to migrate SQL Server subsources
754 // to the source table world.
755 Statement::CreateSource(CreateSourceStatement {
756 connection: CreateSourceConnection::SqlServer { .. },
757 ..
758 }) => (),
759
760 #[expect(unreachable_patterns)]
761 Statement::CreateSource(_) => {}
762 _ => (),
763 }
764 }
765
766 let mut updated_items = BTreeMap::new();
767 for (mut item, mut statement) in items_with_statements_copied {
768 match &statement {
769 // Don’t rewrite any of the statements we just migrated.
770 Statement::CreateSource(_) => {}
771 Statement::CreateSubsource(_) => {}
772 Statement::CreateTableFromSource(_) => {}
773 // We need to rewrite any statements that reference a source id to use the new
774 // table id instead, since any contained data in the source will now be in the table.
775 // This assumes the table has stolen the source's name, which is the case
776 // for all sources that were migrated.
777 _ => {
778 if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) {
779 info!("migrate: updated dependency reference in statement {statement}");
780 item.create_sql = statement.to_ast_string_stable();
781 updated_items.insert(item.id, item);
782 }
783 }
784 }
785 }
786 if !updated_items.is_empty() {
787 tx.update_items(updated_items)?;
788 }
789
790 Ok(())
791}
792
793// Durable migrations
794
795/// Migrations that run only on the durable catalog before any data is loaded into memory.
796pub(crate) fn durable_migrate(
797 tx: &mut Transaction,
798 _organization_id: Uuid,
799 _boot_ts: Timestamp,
800) -> Result<(), anyhow::Error> {
801 // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
802 // binary version instead of the deploy generation.
803 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
804 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
805 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
806 if let Some(shard_id) = tx.get_expression_cache_shard() {
807 tx.mark_shards_as_finalized(btreeset! {shard_id});
808 tx.set_expression_cache_shard(ShardId::new())?;
809 }
810 tx.set_config(
811 EXPR_CACHE_MIGRATION_KEY.to_string(),
812 Some(EXPR_CACHE_MIGRATION_DONE),
813 )?;
814 }
815
816 // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
817 // binary version instead of the deploy generation.
818 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
819 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
820 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
821 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
822 {
823 if let Some(shard_id) = tx.get_builtin_migration_shard() {
824 tx.mark_shards_as_finalized(btreeset! {shard_id});
825 tx.set_builtin_migration_shard(ShardId::new())?;
826 }
827 tx.set_config(
828 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
829 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
830 )?;
831 }
832 Ok(())
833}
834
835// Add new migrations below their appropriate heading, and precede them with a
836// short summary of the migration's purpose and optional additional commentary
837// about safety or approach.
838//
839// The convention is to name the migration function using snake case:
840// > <category>_<description>_<version>
841//
842// Please include the adapter team on any code reviews that add or edit
843// migrations.
844
845fn add_to_audit_log(
846 tx: &mut Transaction,
847 event_type: mz_audit_log::EventType,
848 object_type: mz_audit_log::ObjectType,
849 details: mz_audit_log::EventDetails,
850 occurred_at: mz_ore::now::EpochMillis,
851) -> Result<(), anyhow::Error> {
852 let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
853 let event =
854 mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
855 tx.insert_audit_log_event(event);
856 Ok(())
857}
858
859// Remove PARTITION STRATEGY from CREATE SINK statements.
860fn ast_rewrite_create_sink_partition_strategy(
861 stmt: &mut Statement<Raw>,
862) -> Result<(), anyhow::Error> {
863 let Statement::CreateSink(stmt) = stmt else {
864 return Ok(());
865 };
866 stmt.with_options
867 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
868 Ok(())
869}