mz_adapter/catalog/migrate.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreeset;
13use mz_catalog::builtin::BuiltinTable;
14use mz_catalog::durable::Transaction;
15use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
16use mz_ore::collections::CollectionExt;
17use mz_ore::now::NowFn;
18use mz_persist_types::ShardId;
19use mz_repr::{CatalogItemId, Diff, Timestamp};
20use mz_sql::ast::CreateSinkOptionName;
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::names::FullItemName;
23use mz_sql_parser::ast::{IdentError, Raw, Statement};
24use mz_storage_client::controller::StorageTxn;
25use semver::Version;
26use tracing::info;
27use uuid::Uuid;
28
29// DO NOT add any more imports from `crate` outside of `crate::catalog`.
30use crate::catalog::open::into_consolidatable_updates_startup;
31use crate::catalog::state::LocalExpressionCache;
32use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
33
34fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
35where
36 F: for<'a> FnMut(
37 &'a mut Transaction<'_>,
38 CatalogItemId,
39 &'a mut Statement<Raw>,
40 ) -> Result<(), anyhow::Error>,
41{
42 let mut updated_items = BTreeMap::new();
43
44 for mut item in tx.get_items() {
45 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
46 f(tx, item.id, &mut stmt)?;
47
48 item.create_sql = stmt.to_ast_string_stable();
49
50 updated_items.insert(item.id, item);
51 }
52 tx.update_items(updated_items)?;
53 Ok(())
54}
55
56fn rewrite_items<F>(
57 tx: &mut Transaction<'_>,
58 cat: &ConnCatalog<'_>,
59 mut f: F,
60) -> Result<(), anyhow::Error>
61where
62 F: for<'a> FnMut(
63 &'a mut Transaction<'_>,
64 &'a &ConnCatalog<'_>,
65 CatalogItemId,
66 &'a mut Statement<Raw>,
67 ) -> Result<(), anyhow::Error>,
68{
69 let mut updated_items = BTreeMap::new();
70 let items = tx.get_items();
71 for mut item in items {
72 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
73
74 f(tx, &cat, item.id, &mut stmt)?;
75
76 item.create_sql = stmt.to_ast_string_stable();
77
78 updated_items.insert(item.id, item);
79 }
80 tx.update_items(updated_items)?;
81 Ok(())
82}
83
84pub(crate) struct MigrateResult {
85 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
86 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
87}
88
89/// Migrates all user items and loads them into `state`.
90///
91/// Returns the builtin updates corresponding to all user items.
92pub(crate) async fn migrate(
93 state: &mut CatalogState,
94 tx: &mut Transaction<'_>,
95 local_expr_cache: &mut LocalExpressionCache,
96 item_updates: Vec<StateUpdate>,
97 now: NowFn,
98 _boot_ts: Timestamp,
99) -> Result<MigrateResult, anyhow::Error> {
100 let catalog_version = tx.get_catalog_content_version();
101 let catalog_version = match catalog_version {
102 Some(v) => Version::parse(v)?,
103 None => Version::new(0, 0, 0),
104 };
105
106 info!(
107 "migrating statements from catalog version {:?}",
108 catalog_version
109 );
110
111 // Special block for `ast_rewrite_sources_to_tables` migration
112 // since it requires a feature flag needs to update multiple AST items at once.
113 if state.system_config().force_source_table_syntax() {
114 ast_rewrite_sources_to_tables(tx, now)?;
115 }
116
117 rewrite_ast_items(tx, |_tx, _id, stmt| {
118 // Add per-item AST migrations below.
119 //
120 // Each migration should be a function that takes `stmt` (the AST
121 // representing the creation SQL for the item) as input. Any
122 // mutations to `stmt` will be staged for commit to the catalog.
123 //
124 // Migration functions may also take `tx` as input to stage
125 // arbitrary changes to the catalog.
126 ast_rewrite_create_sink_partition_strategy(stmt)?;
127 Ok(())
128 })?;
129
130 // Load items into catalog. We make sure to consolidate the old updates with the new updates to
131 // avoid trying to apply unmigrated items.
132 let commit_ts = tx.upper();
133 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
134 let op_item_updates = tx.get_and_commit_op_updates();
135 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
136 item_updates.extend(op_item_updates);
137 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
138
139 // Since some migrations might introduce non-item 'post-item' updates, we sequester those
140 // so they can be applied with other post-item updates after migrations to avoid
141 // accumulating negative diffs.
142 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
143 .into_iter()
144 // The only post-item update kind we currently generate is to
145 // update storage collection metadata.
146 .partition(|(kind, _, _)| {
147 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
148 });
149
150 let item_updates = item_updates
151 .into_iter()
152 .map(|(kind, ts, diff)| StateUpdate {
153 kind: kind.into(),
154 ts,
155 diff: diff.try_into().expect("valid diff"),
156 })
157 .collect();
158 let mut ast_builtin_table_updates = state
159 .apply_updates_for_bootstrap(item_updates, local_expr_cache)
160 .await;
161
162 info!("migrating from catalog version {:?}", catalog_version);
163
164 let conn_cat = state.for_system_session();
165
166 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
167 let _catalog_version = catalog_version.clone();
168 // Add per-item, post-planning AST migrations below. Most
169 // migrations should be in the above `rewrite_ast_items` block.
170 //
171 // Each migration should be a function that takes `item` (the AST
172 // representing the creation SQL for the item) as input. Any
173 // mutations to `item` will be staged for commit to the catalog.
174 //
175 // Be careful if you reference `conn_cat`. Doing so is *weird*,
176 // as you'll be rewriting the catalog while looking at it. If
177 // possible, make your migration independent of `conn_cat`, and only
178 // consider a single item at a time.
179 //
180 // Migration functions may also take `tx` as input to stage
181 // arbitrary changes to the catalog.
182 Ok(())
183 })?;
184
185 // Add whole-catalog migrations below.
186 //
187 // Each migration should be a function that takes `tx` and `conn_cat` as
188 // input and stages arbitrary transformations to the catalog on `tx`.
189
190 let op_item_updates = tx.get_and_commit_op_updates();
191 let item_builtin_table_updates = state
192 .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
193 .await;
194
195 ast_builtin_table_updates.extend(item_builtin_table_updates);
196
197 info!(
198 "migration from catalog version {:?} complete",
199 catalog_version
200 );
201 Ok(MigrateResult {
202 builtin_table_updates: ast_builtin_table_updates,
203 post_item_updates,
204 })
205}
206
207// Add new migrations below their appropriate heading, and precede them with a
208// short summary of the migration's purpose and optional additional commentary
209// about safety or approach.
210//
211// The convention is to name the migration function using snake case:
212// > <category>_<description>_<version>
213//
214// Please include the adapter team on any code reviews that add or edit
215// migrations.
216
217/// Migrates all sources to use the new sources as tables model
218///
219/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into
220/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres,
221/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources.
222///
223/// Second we migrate existing `CREATE SOURCE` statements for these multi-output
224/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS).
225///
226/// Third we migrate existing single-output `CREATE SOURCE` statements.
227/// This includes existing Kafka and single-output load-generator
228/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE`
229/// statement that copies over all the export-specific options. This table will use
230/// to the existing source statement's persist shard but use a new GlobalID.
231/// The original source statement will be updated to remove the export-specific options,
232/// renamed to `<original_name>_source`, and use a new empty shard while keeping its
233/// same GlobalId.
234///
235fn ast_rewrite_sources_to_tables(
236 tx: &mut Transaction<'_>,
237 now: NowFn,
238) -> Result<(), anyhow::Error> {
239 use maplit::btreemap;
240 use maplit::btreeset;
241 use mz_persist_types::ShardId;
242 use mz_proto::RustType;
243 use mz_sql::ast::{
244 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
245 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
246 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
247 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
248 UnresolvedItemName, Value, WithOptionValue,
249 };
250 use mz_storage_client::controller::StorageTxn;
251 use mz_storage_types::sources::SourceExportStatementDetails;
252 use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
253 use prost::Message;
254
255 let items_with_statements = tx
256 .get_items()
257 .map(|item| {
258 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
259 Ok((item, stmt))
260 })
261 .collect::<Result<Vec<_>, anyhow::Error>>()?;
262 let items_with_statements_copied = items_with_statements.clone();
263
264 let item_names_per_schema = items_with_statements_copied
265 .iter()
266 .map(|(item, _)| (item.schema_id.clone(), &item.name))
267 .fold(BTreeMap::new(), |mut acc, (schema_id, name)| {
268 acc.entry(schema_id)
269 .or_insert_with(|| btreeset! {})
270 .insert(name);
271 acc
272 });
273
274 // Any CatalogItemId that should be changed to a new CatalogItemId in any statements that
275 // reference it. This is necessary for ensuring downstream statements (e.g.
276 // mat views, indexes) that reference a single-output source (e.g. kafka)
277 // will now reference the corresponding new table, with the same data, instead.
278 let mut changed_ids = BTreeMap::new();
279
280 for (mut item, stmt) in items_with_statements {
281 match stmt {
282 // Migrate each `CREATE SUBSOURCE` statement to an equivalent
283 // `CREATE TABLE ... FROM SOURCE` statement.
284 Statement::CreateSubsource(CreateSubsourceStatement {
285 name,
286 columns,
287 constraints,
288 of_source,
289 if_not_exists,
290 mut with_options,
291 }) => {
292 let raw_source_name = match of_source {
293 // If `of_source` is None then this is a `progress` subsource which we
294 // are not migrating as they are not currently relevant to the new table model.
295 None => continue,
296 Some(name) => name,
297 };
298 let source = match raw_source_name {
299 // Some legacy subsources have named-only references to their `of_source`
300 // so we ensure we always use an ID-based reference in the stored
301 // `CREATE TABLE ... FROM SOURCE` statements.
302 RawItemName::Name(name) => {
303 // Convert the name reference to an ID reference.
304 let (source_item, _) = items_with_statements_copied
305 .iter()
306 .find(|(_, statement)| match statement {
307 Statement::CreateSource(stmt) => stmt.name == name,
308 _ => false,
309 })
310 .expect("source must exist");
311 RawItemName::Id(source_item.id.to_string(), name, None)
312 }
313 RawItemName::Id(..) => raw_source_name,
314 };
315
316 // The external reference is a `with_option` on subsource statements but is a
317 // separate field on table statements.
318 let external_reference = match with_options
319 .iter()
320 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
321 {
322 Some(i) => match with_options.remove(i).value {
323 Some(WithOptionValue::UnresolvedItemName(name)) => name,
324 _ => unreachable!("external reference must be an unresolved item name"),
325 },
326 None => panic!("subsource must have an external reference"),
327 };
328
329 let with_options = with_options
330 .into_iter()
331 .map(|option| {
332 match option.name {
333 CreateSubsourceOptionName::Details => TableFromSourceOption {
334 name: TableFromSourceOptionName::Details,
335 // The `details` option on both subsources and tables is identical, using the same
336 // ProtoSourceExportStatementDetails serialized value.
337 value: option.value,
338 },
339 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
340 name: TableFromSourceOptionName::TextColumns,
341 value: option.value,
342 },
343 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
344 name: TableFromSourceOptionName::ExcludeColumns,
345 value: option.value,
346 },
347 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
348 name: TableFromSourceOptionName::RetainHistory,
349 value: option.value,
350 },
351 CreateSubsourceOptionName::Progress => {
352 panic!("progress option should not exist on this subsource")
353 }
354 CreateSubsourceOptionName::ExternalReference => {
355 unreachable!("This option is handled separately above.")
356 }
357 }
358 })
359 .collect::<Vec<_>>();
360
361 let table = CreateTableFromSourceStatement {
362 name,
363 constraints,
364 columns: mz_sql::ast::TableFromSourceColumns::Defined(columns),
365 if_not_exists,
366 source,
367 external_reference: Some(external_reference.clone()),
368 with_options,
369 // Subsources don't have `envelope`, `include_metadata`, or `format` options.
370 envelope: None,
371 include_metadata: vec![],
372 format: None,
373 };
374
375 info!(
376 "migrate: converted subsource {} to table {}",
377 item.create_sql, table
378 );
379 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
380 tx.update_item(item.id, item)?;
381 }
382
383 // Postgres sources are multi-output sources whose subsources are
384 // migrated above. All we need to do is remove the subsource-related
385 // options from this statement since they are no longer relevant.
386 Statement::CreateSource(CreateSourceStatement {
387 connection: mut conn @ CreateSourceConnection::Postgres { .. },
388 name,
389 if_not_exists,
390 in_cluster,
391 include_metadata,
392 format,
393 envelope,
394 col_names,
395 with_options,
396 key_constraint,
397 external_references,
398 progress_subsource,
399 }) => {
400 let options = match &mut conn {
401 CreateSourceConnection::Postgres { options, .. } => options,
402 _ => unreachable!("match determined above"),
403 };
404 // This option storing text columns on the primary source statement is redundant
405 // with the option on subsource statements so can just be removed.
406 // This was kept for round-tripping of `CREATE SOURCE` statements that automatically
407 // generated subsources, which is no longer necessary.
408 if options
409 .iter()
410 .any(|o| matches!(o.name, PgConfigOptionName::TextColumns))
411 {
412 options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
413 let stmt = Statement::CreateSource(CreateSourceStatement {
414 connection: conn,
415 name,
416 if_not_exists,
417 in_cluster,
418 include_metadata,
419 format,
420 envelope,
421 col_names,
422 with_options,
423 key_constraint,
424 external_references,
425 progress_subsource,
426 });
427 item.create_sql = stmt.to_ast_string_stable();
428 tx.update_item(item.id, item)?;
429 info!("migrate: converted postgres source {stmt} to remove subsource options");
430 }
431 }
432 // MySQL sources are multi-output sources whose subsources are
433 // migrated above. All we need to do is remove the subsource-related
434 // options from this statement since they are no longer relevant.
435 Statement::CreateSource(CreateSourceStatement {
436 connection: mut conn @ CreateSourceConnection::MySql { .. },
437 name,
438 if_not_exists,
439 in_cluster,
440 include_metadata,
441 format,
442 envelope,
443 col_names,
444 with_options,
445 key_constraint,
446 external_references,
447 progress_subsource,
448 ..
449 }) => {
450 let options = match &mut conn {
451 CreateSourceConnection::MySql { options, .. } => options,
452 _ => unreachable!("match determined above"),
453 };
454 // These options storing text and exclude columns on the primary source statement
455 // are redundant with the options on subsource statements so can just be removed.
456 // They was kept for round-tripping of `CREATE SOURCE` statements that automatically
457 // generated subsources, which is no longer necessary.
458 if options.iter().any(|o| {
459 matches!(
460 o.name,
461 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns
462 )
463 }) {
464 options.retain(|o| {
465 !matches!(
466 o.name,
467 MySqlConfigOptionName::TextColumns
468 | MySqlConfigOptionName::ExcludeColumns
469 )
470 });
471 let stmt = Statement::CreateSource(CreateSourceStatement {
472 connection: conn,
473 name,
474 if_not_exists,
475 in_cluster,
476 include_metadata,
477 format,
478 envelope,
479 col_names,
480 with_options,
481 key_constraint,
482 external_references,
483 progress_subsource,
484 });
485 item.create_sql = stmt.to_ast_string_stable();
486 tx.update_item(item.id, item)?;
487 info!("migrate: converted mysql source {stmt} to remove subsource options");
488 }
489 }
490 // Multi-output load generator sources whose subsources are already
491 // migrated above. There is no need to remove any options from this
492 // statement since they are not export-specific.
493 Statement::CreateSource(CreateSourceStatement {
494 connection:
495 CreateSourceConnection::LoadGenerator {
496 generator:
497 LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch,
498 ..
499 },
500 ..
501 }) => {}
502 // Single-output sources that need to be migrated to tables. These sources currently output
503 // data to the primary collection of the source statement. We will create a new table
504 // statement for them and move all export-specific options over from the source statement,
505 // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the
506 // new table statement.
507 Statement::CreateSource(CreateSourceStatement {
508 connection:
509 conn @ (CreateSourceConnection::Kafka { .. }
510 | CreateSourceConnection::LoadGenerator {
511 generator:
512 LoadGenerator::Clock
513 | LoadGenerator::Datums
514 | LoadGenerator::Counter
515 | LoadGenerator::KeyValue,
516 ..
517 }),
518 name,
519 col_names,
520 include_metadata,
521 format,
522 envelope,
523 with_options,
524 if_not_exists,
525 in_cluster,
526 progress_subsource,
527 external_references,
528 key_constraint,
529 }) => {
530 // To check if this is a source that has already been migrated we use a basic
531 // heuristic: if there is at least one existing table for the source, and if
532 // the envelope/format/include_metadata options are empty, we assume it's
533 // already been migrated.
534 let tables_for_source =
535 items_with_statements_copied
536 .iter()
537 .any(|(_, statement)| match statement {
538 Statement::CreateTableFromSource(stmt) => {
539 let source: CatalogItemId = match &stmt.source {
540 RawItemName::Name(_) => {
541 unreachable!("tables store source as ID")
542 }
543 RawItemName::Id(source_id, _, _) => {
544 source_id.parse().expect("valid id")
545 }
546 };
547 source == item.id
548 }
549 _ => false,
550 });
551 if tables_for_source
552 && envelope.is_none()
553 && format.is_none()
554 && include_metadata.is_empty()
555 {
556 info!("migrate: skipping already migrated source: {}", name);
557 continue;
558 }
559
560 // Use the current source name as the new table name, and rename the source to
561 // `<source_name>_source`. This is intended to allow users to continue using
562 // queries that reference the source name, since they will now need to query the
563 // table instead.
564
565 assert_eq!(
566 item.name,
567 name.0.last().expect("at least one ident").to_string()
568 );
569 // First find an unused name within the same schema to avoid conflicts.
570 let is_valid = |new_source_ident: &Ident| {
571 if item_names_per_schema
572 .get(&item.schema_id)
573 .expect("schema must exist")
574 .contains(&new_source_ident.to_string())
575 {
576 Ok::<_, IdentError>(false)
577 } else {
578 Ok(true)
579 }
580 };
581 let new_source_ident =
582 Ident::try_generate_name(item.name.clone(), "_source", is_valid)?;
583
584 // We will use the original item name for the new table item.
585 let table_item_name = item.name.clone();
586
587 // Update the source item/statement to use the new name.
588 let mut new_source_name = name.clone();
589 *new_source_name.0.last_mut().expect("at least one ident") =
590 new_source_ident.clone();
591 item.name = new_source_ident.to_string();
592
593 // A reference to the source that will be included in the table statement
594 let source_ref =
595 RawItemName::Id(item.id.to_string(), new_source_name.clone(), None);
596
597 let columns = if col_names.is_empty() {
598 TableFromSourceColumns::NotSpecified
599 } else {
600 TableFromSourceColumns::Named(col_names)
601 };
602
603 // All source tables must have a `details` option, which is a serialized proto
604 // describing any source-specific details for this table statement.
605 let details = match &conn {
606 // For kafka sources this proto is currently empty.
607 CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {},
608 CreateSourceConnection::LoadGenerator { .. } => {
609 // Since these load generators are single-output we use the default output.
610 SourceExportStatementDetails::LoadGenerator {
611 output: LoadGeneratorOutput::Default,
612 }
613 }
614 _ => unreachable!("match determined above"),
615 };
616 let table_with_options = vec![TableFromSourceOption {
617 name: TableFromSourceOptionName::Details,
618 value: Some(WithOptionValue::Value(Value::String(hex::encode(
619 details.into_proto().encode_to_vec(),
620 )))),
621 }];
622
623 // Generate the same external-reference that would have been generated
624 // during purification for single-output sources.
625 let external_reference = match &conn {
626 CreateSourceConnection::Kafka { options, .. } => {
627 let topic_option = options
628 .iter()
629 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
630 .expect("kafka sources must have a topic");
631 let topic = match &topic_option.value {
632 Some(WithOptionValue::Value(Value::String(topic))) => topic,
633 _ => unreachable!("topic must be a string"),
634 };
635
636 Some(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
637 }
638 CreateSourceConnection::LoadGenerator { generator, .. } => {
639 // Since these load generators are single-output the external reference
640 // uses the schema-name for both namespace and name.
641 let name = FullItemName {
642 database: mz_sql::names::RawDatabaseSpecifier::Name(
643 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
644 .to_owned(),
645 ),
646 schema: generator.schema_name().to_string(),
647 item: generator.schema_name().to_string(),
648 };
649 Some(UnresolvedItemName::from(name))
650 }
651 _ => unreachable!("match determined above"),
652 };
653
654 // The new table statement, stealing the name and the export-specific fields from
655 // the create source statement.
656 let table = CreateTableFromSourceStatement {
657 name,
658 constraints: vec![],
659 columns,
660 if_not_exists: false,
661 source: source_ref,
662 external_reference,
663 with_options: table_with_options,
664 envelope,
665 include_metadata,
666 format,
667 };
668
669 // The source statement with a new name and many of its fields emptied
670 let source = CreateSourceStatement {
671 connection: conn,
672 name: new_source_name,
673 if_not_exists,
674 in_cluster,
675 include_metadata: vec![],
676 format: None,
677 envelope: None,
678 col_names: vec![],
679 with_options,
680 key_constraint,
681 external_references,
682 progress_subsource,
683 };
684
685 let source_id = item.id;
686 let source_global_id = item.global_id;
687 let schema_id = item.schema_id.clone();
688 let schema = tx.get_schema(&item.schema_id).expect("schema must exist");
689
690 let owner_id = item.owner_id.clone();
691 let privileges = item.privileges.clone();
692 let extra_versions = item.extra_versions.clone();
693
694 // Update the source statement in the catalog first, since the name will
695 // otherwise conflict with the new table statement.
696 info!("migrate: updated source {} to {source}", item.create_sql);
697 item.create_sql = Statement::CreateSource(source).to_ast_string_stable();
698 tx.update_item(item.id, item)?;
699
700 // Insert the new table statement into the catalog with a new id.
701 let ids = tx.allocate_user_item_ids(1)?;
702 let (new_table_id, new_table_global_id) = ids[0];
703 info!("migrate: added table {new_table_id}: {table}");
704 tx.insert_user_item(
705 new_table_id,
706 new_table_global_id,
707 schema_id,
708 &table_item_name,
709 table.to_ast_string_stable(),
710 owner_id,
711 privileges,
712 &Default::default(),
713 extra_versions,
714 )?;
715 // We need to move the shard currently attached to the source statement to the
716 // table statement such that the existing data in the shard is preserved and can
717 // be queried on the new table statement. However, we need to keep the GlobalId of
718 // the source the same, to preserve existing references to that statement in
719 // external tools such as DBT and Terraform. We will insert a new shard for the source
720 // statement which will be automatically created after the migration is complete.
721 let new_source_shard = ShardId::new();
722 let (source_global_id, existing_source_shard) = tx
723 .delete_collection_metadata(btreeset! {source_global_id})
724 .pop()
725 .expect("shard should exist");
726 tx.insert_collection_metadata(btreemap! {
727 new_table_global_id => existing_source_shard,
728 source_global_id => new_source_shard
729 })?;
730
731 add_to_audit_log(
732 tx,
733 mz_audit_log::EventType::Create,
734 mz_audit_log::ObjectType::Table,
735 mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 {
736 id: new_table_id.to_string(),
737 name: mz_audit_log::FullNameV1 {
738 database: schema
739 .database_id
740 .map(|d| d.to_string())
741 .unwrap_or_default(),
742 schema: schema.name,
743 item: table_item_name,
744 },
745 }),
746 now(),
747 )?;
748
749 // We also need to update any other statements that reference the source to use the new
750 // table id/name instead.
751 changed_ids.insert(source_id, new_table_id);
752 }
753
754 // TODO(sql_server2): Consider how to migrate SQL Server subsources
755 // to the source table world.
756 Statement::CreateSource(CreateSourceStatement {
757 connection: CreateSourceConnection::SqlServer { .. },
758 ..
759 }) => (),
760
761 #[expect(unreachable_patterns)]
762 Statement::CreateSource(_) => {}
763 _ => (),
764 }
765 }
766
767 let mut updated_items = BTreeMap::new();
768 for (mut item, mut statement) in items_with_statements_copied {
769 match &statement {
770 // Don’t rewrite any of the statements we just migrated.
771 Statement::CreateSource(_) => {}
772 Statement::CreateSubsource(_) => {}
773 Statement::CreateTableFromSource(_) => {}
774 // We need to rewrite any statements that reference a source id to use the new
775 // table id instead, since any contained data in the source will now be in the table.
776 // This assumes the table has stolen the source's name, which is the case
777 // for all sources that were migrated.
778 _ => {
779 if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) {
780 info!("migrate: updated dependency reference in statement {statement}");
781 item.create_sql = statement.to_ast_string_stable();
782 updated_items.insert(item.id, item);
783 }
784 }
785 }
786 }
787 if !updated_items.is_empty() {
788 tx.update_items(updated_items)?;
789 }
790
791 Ok(())
792}
793
794// Durable migrations
795
796/// Migrations that run only on the durable catalog before any data is loaded into memory.
797pub(crate) fn durable_migrate(
798 tx: &mut Transaction,
799 _organization_id: Uuid,
800 _boot_ts: Timestamp,
801) -> Result<(), anyhow::Error> {
802 // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
803 // binary version instead of the deploy generation.
804 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
805 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
806 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
807 if let Some(shard_id) = tx.get_expression_cache_shard() {
808 tx.mark_shards_as_finalized(btreeset! {shard_id});
809 tx.set_expression_cache_shard(ShardId::new())?;
810 }
811 tx.set_config(
812 EXPR_CACHE_MIGRATION_KEY.to_string(),
813 Some(EXPR_CACHE_MIGRATION_DONE),
814 )?;
815 }
816
817 // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
818 // binary version instead of the deploy generation.
819 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
820 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
821 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
822 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
823 {
824 if let Some(shard_id) = tx.get_builtin_migration_shard() {
825 tx.mark_shards_as_finalized(btreeset! {shard_id});
826 tx.set_builtin_migration_shard(ShardId::new())?;
827 }
828 tx.set_config(
829 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
830 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
831 )?;
832 }
833 Ok(())
834}
835
836// Add new migrations below their appropriate heading, and precede them with a
837// short summary of the migration's purpose and optional additional commentary
838// about safety or approach.
839//
840// The convention is to name the migration function using snake case:
841// > <category>_<description>_<version>
842//
843// Please include the adapter team on any code reviews that add or edit
844// migrations.
845
846fn add_to_audit_log(
847 tx: &mut Transaction,
848 event_type: mz_audit_log::EventType,
849 object_type: mz_audit_log::ObjectType,
850 details: mz_audit_log::EventDetails,
851 occurred_at: mz_ore::now::EpochMillis,
852) -> Result<(), anyhow::Error> {
853 let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
854 let event =
855 mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
856 tx.insert_audit_log_event(event);
857 Ok(())
858}
859
860// Remove PARTITION STRATEGY from CREATE SINK statements.
861fn ast_rewrite_create_sink_partition_strategy(
862 stmt: &mut Statement<Raw>,
863) -> Result<(), anyhow::Error> {
864 let Statement::CreateSink(stmt) = stmt else {
865 return Ok(());
866 };
867 stmt.with_options
868 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
869 Ok(())
870}