mz_adapter/catalog/migrate.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_repr::{CatalogItemId, Diff, Timestamp};
21use mz_sql::ast::CreateSinkOptionName;
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::names::FullItemName;
24use mz_sql_parser::ast::{IdentError, Raw, Statement};
25use mz_storage_client::controller::StorageTxn;
26use semver::Version;
27use tracing::info;
28use uuid::Uuid;
29
30// DO NOT add any more imports from `crate` outside of `crate::catalog`.
31use crate::catalog::open::into_consolidatable_updates_startup;
32use crate::catalog::state::LocalExpressionCache;
33use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
34
35fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
36where
37 F: for<'a> FnMut(
38 &'a mut Transaction<'_>,
39 CatalogItemId,
40 &'a mut Statement<Raw>,
41 ) -> Result<(), anyhow::Error>,
42{
43 let mut updated_items = BTreeMap::new();
44
45 for mut item in tx.get_items() {
46 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
47 f(tx, item.id, &mut stmt)?;
48
49 item.create_sql = stmt.to_ast_string_stable();
50
51 updated_items.insert(item.id, item);
52 }
53 tx.update_items(updated_items)?;
54 Ok(())
55}
56
57fn rewrite_items<F>(
58 tx: &mut Transaction<'_>,
59 cat: &ConnCatalog<'_>,
60 mut f: F,
61) -> Result<(), anyhow::Error>
62where
63 F: for<'a> FnMut(
64 &'a mut Transaction<'_>,
65 &'a &ConnCatalog<'_>,
66 CatalogItemId,
67 &'a mut Statement<Raw>,
68 ) -> Result<(), anyhow::Error>,
69{
70 let mut updated_items = BTreeMap::new();
71 let items = tx.get_items();
72 for mut item in items {
73 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
74
75 f(tx, &cat, item.id, &mut stmt)?;
76
77 item.create_sql = stmt.to_ast_string_stable();
78
79 updated_items.insert(item.id, item);
80 }
81 tx.update_items(updated_items)?;
82 Ok(())
83}
84
85pub(crate) struct MigrateResult {
86 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
87 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
88}
89
90/// Migrates all user items and loads them into `state`.
91///
92/// Returns the builtin updates corresponding to all user items.
93pub(crate) async fn migrate(
94 state: &mut CatalogState,
95 tx: &mut Transaction<'_>,
96 local_expr_cache: &mut LocalExpressionCache,
97 item_updates: Vec<StateUpdate>,
98 now: NowFn,
99 _boot_ts: Timestamp,
100) -> Result<MigrateResult, anyhow::Error> {
101 let catalog_version = tx.get_catalog_content_version();
102 let catalog_version = match catalog_version {
103 Some(v) => Version::parse(v)?,
104 None => Version::new(0, 0, 0),
105 };
106
107 info!(
108 "migrating statements from catalog version {:?}",
109 catalog_version
110 );
111
112 // Special block for `ast_rewrite_sources_to_tables` migration
113 // since it requires a feature flag needs to update multiple AST items at once.
114 if state.system_config().force_source_table_syntax() {
115 ast_rewrite_sources_to_tables(tx, now)?;
116 }
117
118 rewrite_ast_items(tx, |_tx, _id, stmt| {
119 // Add per-item AST migrations below.
120 //
121 // Each migration should be a function that takes `stmt` (the AST
122 // representing the creation SQL for the item) as input. Any
123 // mutations to `stmt` will be staged for commit to the catalog.
124 //
125 // Migration functions may also take `tx` as input to stage
126 // arbitrary changes to the catalog.
127 ast_rewrite_create_sink_partition_strategy(stmt)?;
128 Ok(())
129 })?;
130
131 // Load items into catalog. We make sure to consolidate the old updates with the new updates to
132 // avoid trying to apply unmigrated items.
133 let commit_ts = tx.upper();
134 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
135 let op_item_updates = tx.get_and_commit_op_updates();
136 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
137 item_updates.extend(op_item_updates);
138 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
139
140 // Since some migrations might introduce non-item 'post-item' updates, we sequester those
141 // so they can be applied with other post-item updates after migrations to avoid
142 // accumulating negative diffs.
143 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
144 .into_iter()
145 // The only post-item update kind we currently generate is to
146 // update storage collection metadata.
147 .partition(|(kind, _, _)| {
148 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
149 });
150
151 let item_updates = item_updates
152 .into_iter()
153 .map(|(kind, ts, diff)| StateUpdate {
154 kind: kind.into(),
155 ts,
156 diff: diff.try_into().expect("valid diff"),
157 })
158 .collect();
159 let mut ast_builtin_table_updates = state
160 .apply_updates_for_bootstrap(item_updates, local_expr_cache)
161 .await;
162
163 info!("migrating from catalog version {:?}", catalog_version);
164
165 let conn_cat = state.for_system_session();
166
167 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
168 let _catalog_version = catalog_version.clone();
169 // Add per-item, post-planning AST migrations below. Most
170 // migrations should be in the above `rewrite_ast_items` block.
171 //
172 // Each migration should be a function that takes `item` (the AST
173 // representing the creation SQL for the item) as input. Any
174 // mutations to `item` will be staged for commit to the catalog.
175 //
176 // Be careful if you reference `conn_cat`. Doing so is *weird*,
177 // as you'll be rewriting the catalog while looking at it. If
178 // possible, make your migration independent of `conn_cat`, and only
179 // consider a single item at a time.
180 //
181 // Migration functions may also take `tx` as input to stage
182 // arbitrary changes to the catalog.
183 Ok(())
184 })?;
185
186 // Add whole-catalog migrations below.
187 //
188 // Each migration should be a function that takes `tx` and `conn_cat` as
189 // input and stages arbitrary transformations to the catalog on `tx`.
190
191 let op_item_updates = tx.get_and_commit_op_updates();
192 let item_builtin_table_updates = state
193 .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
194 .await;
195
196 ast_builtin_table_updates.extend(item_builtin_table_updates);
197
198 info!(
199 "migration from catalog version {:?} complete",
200 catalog_version
201 );
202 Ok(MigrateResult {
203 builtin_table_updates: ast_builtin_table_updates,
204 post_item_updates,
205 })
206}
207
208// Add new migrations below their appropriate heading, and precede them with a
209// short summary of the migration's purpose and optional additional commentary
210// about safety or approach.
211//
212// The convention is to name the migration function using snake case:
213// > <category>_<description>_<version>
214//
215// Please include the adapter team on any code reviews that add or edit
216// migrations.
217
218/// Migrates all sources to use the new sources as tables model
219///
220/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into
221/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres,
222/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources.
223///
224/// Second we migrate existing `CREATE SOURCE` statements for these multi-output
225/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS).
226///
227/// Third we migrate existing single-output `CREATE SOURCE` statements.
228/// This includes existing Kafka and single-output load-generator
229/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE`
230/// statement that copies over all the export-specific options. This table will use
231/// to the existing source statement's persist shard but use a new GlobalID.
232/// The original source statement will be updated to remove the export-specific options,
233/// renamed to `<original_name>_source`, and use a new empty shard while keeping its
234/// same GlobalId.
235///
236fn ast_rewrite_sources_to_tables(
237 tx: &mut Transaction<'_>,
238 now: NowFn,
239) -> Result<(), anyhow::Error> {
240 use maplit::btreemap;
241 use maplit::btreeset;
242 use mz_persist_types::ShardId;
243 use mz_proto::RustType;
244 use mz_sql::ast::{
245 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
246 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
247 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
248 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
249 UnresolvedItemName, Value, WithOptionValue,
250 };
251 use mz_storage_client::controller::StorageTxn;
252 use mz_storage_types::sources::SourceExportStatementDetails;
253 use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
254 use prost::Message;
255
256 let items_with_statements = tx
257 .get_items()
258 .map(|item| {
259 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
260 Ok((item, stmt))
261 })
262 .collect::<Result<Vec<_>, anyhow::Error>>()?;
263 let items_with_statements_copied = items_with_statements.clone();
264
265 let item_names_per_schema = items_with_statements_copied
266 .iter()
267 .map(|(item, _)| (item.schema_id.clone(), &item.name))
268 .fold(BTreeMap::new(), |mut acc, (schema_id, name)| {
269 acc.entry(schema_id)
270 .or_insert_with(|| btreeset! {})
271 .insert(name);
272 acc
273 });
274
275 // Any CatalogItemId that should be changed to a new CatalogItemId in any statements that
276 // reference it. This is necessary for ensuring downstream statements (e.g.
277 // mat views, indexes) that reference a single-output source (e.g. kafka)
278 // will now reference the corresponding new table, with the same data, instead.
279 let mut changed_ids = BTreeMap::new();
280
281 for (mut item, stmt) in items_with_statements {
282 match stmt {
283 // Migrate each `CREATE SUBSOURCE` statement to an equivalent
284 // `CREATE TABLE ... FROM SOURCE` statement.
285 Statement::CreateSubsource(CreateSubsourceStatement {
286 name,
287 columns,
288 constraints,
289 of_source,
290 if_not_exists,
291 mut with_options,
292 }) => {
293 let raw_source_name = match of_source {
294 // If `of_source` is None then this is a `progress` subsource which we
295 // are not migrating as they are not currently relevant to the new table model.
296 None => continue,
297 Some(name) => name,
298 };
299 let source = match raw_source_name {
300 // Some legacy subsources have named-only references to their `of_source`
301 // so we ensure we always use an ID-based reference in the stored
302 // `CREATE TABLE ... FROM SOURCE` statements.
303 RawItemName::Name(name) => {
304 // Convert the name reference to an ID reference.
305 let (source_item, _) = items_with_statements_copied
306 .iter()
307 .find(|(_, statement)| match statement {
308 Statement::CreateSource(stmt) => stmt.name == name,
309 _ => false,
310 })
311 .expect("source must exist");
312 RawItemName::Id(source_item.id.to_string(), name, None)
313 }
314 RawItemName::Id(..) => raw_source_name,
315 };
316
317 // The external reference is a `with_option` on subsource statements but is a
318 // separate field on table statements.
319 let external_reference = match with_options
320 .iter()
321 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
322 {
323 Some(i) => match with_options.remove(i).value {
324 Some(WithOptionValue::UnresolvedItemName(name)) => name,
325 _ => unreachable!("external reference must be an unresolved item name"),
326 },
327 None => panic!("subsource must have an external reference"),
328 };
329
330 let with_options = with_options
331 .into_iter()
332 .map(|option| {
333 match option.name {
334 CreateSubsourceOptionName::Details => TableFromSourceOption {
335 name: TableFromSourceOptionName::Details,
336 // The `details` option on both subsources and tables is identical, using the same
337 // ProtoSourceExportStatementDetails serialized value.
338 value: option.value,
339 },
340 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
341 name: TableFromSourceOptionName::TextColumns,
342 value: option.value,
343 },
344 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
345 name: TableFromSourceOptionName::ExcludeColumns,
346 value: option.value,
347 },
348 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
349 name: TableFromSourceOptionName::RetainHistory,
350 value: option.value,
351 },
352 CreateSubsourceOptionName::Progress => {
353 panic!("progress option should not exist on this subsource")
354 }
355 CreateSubsourceOptionName::ExternalReference => {
356 unreachable!("This option is handled separately above.")
357 }
358 }
359 })
360 .collect::<Vec<_>>();
361
362 let table = CreateTableFromSourceStatement {
363 name,
364 constraints,
365 columns: mz_sql::ast::TableFromSourceColumns::Defined(columns),
366 if_not_exists,
367 source,
368 external_reference: Some(external_reference.clone()),
369 with_options,
370 // Subsources don't have `envelope`, `include_metadata`, or `format` options.
371 envelope: None,
372 include_metadata: vec![],
373 format: None,
374 };
375
376 info!(
377 "migrate: converted subsource {} to table {}",
378 item.create_sql, table
379 );
380 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
381 tx.update_item(item.id, item)?;
382 }
383
384 // Postgres sources are multi-output sources whose subsources are
385 // migrated above. All we need to do is remove the subsource-related
386 // options from this statement since they are no longer relevant.
387 Statement::CreateSource(CreateSourceStatement {
388 connection: mut conn @ CreateSourceConnection::Postgres { .. },
389 name,
390 if_not_exists,
391 in_cluster,
392 include_metadata,
393 format,
394 envelope,
395 col_names,
396 with_options,
397 key_constraint,
398 external_references,
399 progress_subsource,
400 }) => {
401 let options = match &mut conn {
402 CreateSourceConnection::Postgres { options, .. } => options,
403 _ => unreachable!("match determined above"),
404 };
405 // This option storing text columns on the primary source statement is redundant
406 // with the option on subsource statements so can just be removed.
407 // This was kept for round-tripping of `CREATE SOURCE` statements that automatically
408 // generated subsources, which is no longer necessary.
409 if options
410 .iter()
411 .any(|o| matches!(o.name, PgConfigOptionName::TextColumns))
412 {
413 options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
414 let stmt = Statement::CreateSource(CreateSourceStatement {
415 connection: conn,
416 name,
417 if_not_exists,
418 in_cluster,
419 include_metadata,
420 format,
421 envelope,
422 col_names,
423 with_options,
424 key_constraint,
425 external_references,
426 progress_subsource,
427 });
428 item.create_sql = stmt.to_ast_string_stable();
429 tx.update_item(item.id, item)?;
430 info!("migrate: converted postgres source {stmt} to remove subsource options");
431 }
432 }
433 // MySQL sources are multi-output sources whose subsources are
434 // migrated above. All we need to do is remove the subsource-related
435 // options from this statement since they are no longer relevant.
436 Statement::CreateSource(CreateSourceStatement {
437 connection: mut conn @ CreateSourceConnection::MySql { .. },
438 name,
439 if_not_exists,
440 in_cluster,
441 include_metadata,
442 format,
443 envelope,
444 col_names,
445 with_options,
446 key_constraint,
447 external_references,
448 progress_subsource,
449 ..
450 }) => {
451 let options = match &mut conn {
452 CreateSourceConnection::MySql { options, .. } => options,
453 _ => unreachable!("match determined above"),
454 };
455 // These options storing text and exclude columns on the primary source statement
456 // are redundant with the options on subsource statements so can just be removed.
457 // They was kept for round-tripping of `CREATE SOURCE` statements that automatically
458 // generated subsources, which is no longer necessary.
459 if options.iter().any(|o| {
460 matches!(
461 o.name,
462 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns
463 )
464 }) {
465 options.retain(|o| {
466 !matches!(
467 o.name,
468 MySqlConfigOptionName::TextColumns
469 | MySqlConfigOptionName::ExcludeColumns
470 )
471 });
472 let stmt = Statement::CreateSource(CreateSourceStatement {
473 connection: conn,
474 name,
475 if_not_exists,
476 in_cluster,
477 include_metadata,
478 format,
479 envelope,
480 col_names,
481 with_options,
482 key_constraint,
483 external_references,
484 progress_subsource,
485 });
486 item.create_sql = stmt.to_ast_string_stable();
487 tx.update_item(item.id, item)?;
488 info!("migrate: converted mysql source {stmt} to remove subsource options");
489 }
490 }
491 // Multi-output load generator sources whose subsources are already
492 // migrated above. There is no need to remove any options from this
493 // statement since they are not export-specific.
494 Statement::CreateSource(CreateSourceStatement {
495 connection:
496 CreateSourceConnection::LoadGenerator {
497 generator:
498 LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch,
499 ..
500 },
501 ..
502 }) => {}
503 // Single-output sources that need to be migrated to tables. These sources currently output
504 // data to the primary collection of the source statement. We will create a new table
505 // statement for them and move all export-specific options over from the source statement,
506 // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the
507 // new table statement.
508 Statement::CreateSource(CreateSourceStatement {
509 connection:
510 conn @ (CreateSourceConnection::Kafka { .. }
511 | CreateSourceConnection::LoadGenerator {
512 generator:
513 LoadGenerator::Clock
514 | LoadGenerator::Datums
515 | LoadGenerator::Counter
516 | LoadGenerator::KeyValue,
517 ..
518 }),
519 name,
520 col_names,
521 include_metadata,
522 format,
523 envelope,
524 with_options,
525 if_not_exists,
526 in_cluster,
527 progress_subsource,
528 external_references,
529 key_constraint,
530 }) => {
531 // To check if this is a source that has already been migrated we use a basic
532 // heuristic: if there is at least one existing table for the source, and if
533 // the envelope/format/include_metadata options are empty, we assume it's
534 // already been migrated.
535 let tables_for_source =
536 items_with_statements_copied
537 .iter()
538 .any(|(_, statement)| match statement {
539 Statement::CreateTableFromSource(stmt) => {
540 let source: CatalogItemId = match &stmt.source {
541 RawItemName::Name(_) => {
542 unreachable!("tables store source as ID")
543 }
544 RawItemName::Id(source_id, _, _) => {
545 source_id.parse().expect("valid id")
546 }
547 };
548 source == item.id
549 }
550 _ => false,
551 });
552 if tables_for_source
553 && envelope.is_none()
554 && format.is_none()
555 && include_metadata.is_empty()
556 {
557 info!("migrate: skipping already migrated source: {}", name);
558 continue;
559 }
560
561 // Use the current source name as the new table name, and rename the source to
562 // `<source_name>_source`. This is intended to allow users to continue using
563 // queries that reference the source name, since they will now need to query the
564 // table instead.
565
566 assert_eq!(
567 item.name,
568 name.0.last().expect("at least one ident").to_string()
569 );
570 // First find an unused name within the same schema to avoid conflicts.
571 let is_valid = |new_source_ident: &Ident| {
572 if item_names_per_schema
573 .get(&item.schema_id)
574 .expect("schema must exist")
575 .contains(&new_source_ident.to_string())
576 {
577 Ok::<_, IdentError>(false)
578 } else {
579 Ok(true)
580 }
581 };
582 let new_source_ident =
583 Ident::try_generate_name(item.name.clone(), "_source", is_valid)?;
584
585 // We will use the original item name for the new table item.
586 let table_item_name = item.name.clone();
587
588 // Update the source item/statement to use the new name.
589 let mut new_source_name = name.clone();
590 *new_source_name.0.last_mut().expect("at least one ident") =
591 new_source_ident.clone();
592 item.name = new_source_ident.to_string();
593
594 // A reference to the source that will be included in the table statement
595 let source_ref =
596 RawItemName::Id(item.id.to_string(), new_source_name.clone(), None);
597
598 let columns = if col_names.is_empty() {
599 TableFromSourceColumns::NotSpecified
600 } else {
601 TableFromSourceColumns::Named(col_names)
602 };
603
604 // All source tables must have a `details` option, which is a serialized proto
605 // describing any source-specific details for this table statement.
606 let details = match &conn {
607 // For kafka sources this proto is currently empty.
608 CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {},
609 CreateSourceConnection::LoadGenerator { .. } => {
610 // Since these load generators are single-output we use the default output.
611 SourceExportStatementDetails::LoadGenerator {
612 output: LoadGeneratorOutput::Default,
613 }
614 }
615 _ => unreachable!("match determined above"),
616 };
617 let table_with_options = vec![TableFromSourceOption {
618 name: TableFromSourceOptionName::Details,
619 value: Some(WithOptionValue::Value(Value::String(hex::encode(
620 details.into_proto().encode_to_vec(),
621 )))),
622 }];
623
624 // Generate the same external-reference that would have been generated
625 // during purification for single-output sources.
626 let external_reference = match &conn {
627 CreateSourceConnection::Kafka { options, .. } => {
628 let topic_option = options
629 .iter()
630 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
631 .expect("kafka sources must have a topic");
632 let topic = match &topic_option.value {
633 Some(WithOptionValue::Value(Value::String(topic))) => topic,
634 _ => unreachable!("topic must be a string"),
635 };
636
637 Some(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
638 }
639 CreateSourceConnection::LoadGenerator { generator, .. } => {
640 // Since these load generators are single-output the external reference
641 // uses the schema-name for both namespace and name.
642 let name = FullItemName {
643 database: mz_sql::names::RawDatabaseSpecifier::Name(
644 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
645 .to_owned(),
646 ),
647 schema: generator.schema_name().to_string(),
648 item: generator.schema_name().to_string(),
649 };
650 Some(UnresolvedItemName::from(name))
651 }
652 _ => unreachable!("match determined above"),
653 };
654
655 // The new table statement, stealing the name and the export-specific fields from
656 // the create source statement.
657 let table = CreateTableFromSourceStatement {
658 name,
659 constraints: vec![],
660 columns,
661 if_not_exists: false,
662 source: source_ref,
663 external_reference,
664 with_options: table_with_options,
665 envelope,
666 include_metadata,
667 format,
668 };
669
670 // The source statement with a new name and many of its fields emptied
671 let source = CreateSourceStatement {
672 connection: conn,
673 name: new_source_name,
674 if_not_exists,
675 in_cluster,
676 include_metadata: vec![],
677 format: None,
678 envelope: None,
679 col_names: vec![],
680 with_options,
681 key_constraint,
682 external_references,
683 progress_subsource,
684 };
685
686 let source_id = item.id;
687 let source_global_id = item.global_id;
688 let schema_id = item.schema_id.clone();
689 let schema = tx.get_schema(&item.schema_id).expect("schema must exist");
690
691 let owner_id = item.owner_id.clone();
692 let privileges = item.privileges.clone();
693 let extra_versions = item.extra_versions.clone();
694
695 // Update the source statement in the catalog first, since the name will
696 // otherwise conflict with the new table statement.
697 info!("migrate: updated source {} to {source}", item.create_sql);
698 item.create_sql = Statement::CreateSource(source).to_ast_string_stable();
699 tx.update_item(item.id, item)?;
700
701 // Insert the new table statement into the catalog with a new id.
702 let ids = tx.allocate_user_item_ids(1)?;
703 let (new_table_id, new_table_global_id) = ids[0];
704 info!("migrate: added table {new_table_id}: {table}");
705 tx.insert_user_item(
706 new_table_id,
707 new_table_global_id,
708 schema_id,
709 &table_item_name,
710 table.to_ast_string_stable(),
711 owner_id,
712 privileges,
713 &Default::default(),
714 extra_versions,
715 )?;
716 // We need to move the shard currently attached to the source statement to the
717 // table statement such that the existing data in the shard is preserved and can
718 // be queried on the new table statement. However, we need to keep the GlobalId of
719 // the source the same, to preserve existing references to that statement in
720 // external tools such as DBT and Terraform. We will insert a new shard for the source
721 // statement which will be automatically created after the migration is complete.
722 let new_source_shard = ShardId::new();
723 let (source_global_id, existing_source_shard) = tx
724 .delete_collection_metadata(btreeset! {source_global_id})
725 .pop()
726 .expect("shard should exist");
727 tx.insert_collection_metadata(btreemap! {
728 new_table_global_id => existing_source_shard,
729 source_global_id => new_source_shard
730 })?;
731
732 add_to_audit_log(
733 tx,
734 mz_audit_log::EventType::Create,
735 mz_audit_log::ObjectType::Table,
736 mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 {
737 id: new_table_id.to_string(),
738 name: mz_audit_log::FullNameV1 {
739 database: schema
740 .database_id
741 .map(|d| d.to_string())
742 .unwrap_or_default(),
743 schema: schema.name,
744 item: table_item_name,
745 },
746 }),
747 now(),
748 )?;
749
750 // We also need to update any other statements that reference the source to use the new
751 // table id/name instead.
752 changed_ids.insert(source_id, new_table_id);
753 }
754
755 // TODO(sql_server2): Consider how to migrate SQL Server subsources
756 // to the source table world.
757 Statement::CreateSource(CreateSourceStatement {
758 connection: CreateSourceConnection::SqlServer { .. },
759 ..
760 }) => (),
761
762 #[expect(unreachable_patterns)]
763 Statement::CreateSource(_) => {}
764 _ => (),
765 }
766 }
767
768 let mut updated_items = BTreeMap::new();
769 for (mut item, mut statement) in items_with_statements_copied {
770 match &statement {
771 // Don’t rewrite any of the statements we just migrated.
772 Statement::CreateSource(_) => {}
773 Statement::CreateSubsource(_) => {}
774 Statement::CreateTableFromSource(_) => {}
775 // We need to rewrite any statements that reference a source id to use the new
776 // table id instead, since any contained data in the source will now be in the table.
777 // This assumes the table has stolen the source's name, which is the case
778 // for all sources that were migrated.
779 _ => {
780 if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) {
781 info!("migrate: updated dependency reference in statement {statement}");
782 item.create_sql = statement.to_ast_string_stable();
783 updated_items.insert(item.id, item);
784 }
785 }
786 }
787 }
788 if !updated_items.is_empty() {
789 tx.update_items(updated_items)?;
790 }
791
792 Ok(())
793}
794
795// Durable migrations
796
797/// Migrations that run only on the durable catalog before any data is loaded into memory.
798pub(crate) fn durable_migrate(
799 tx: &mut Transaction,
800 _organization_id: Uuid,
801 _boot_ts: Timestamp,
802) -> Result<(), anyhow::Error> {
803 // Migrate the expression cache to a new shard. We're updating the keys to use the explicit
804 // binary version instead of the deploy generation.
805 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
806 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
807 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
808 if let Some(shard_id) = tx.get_expression_cache_shard() {
809 tx.mark_shards_as_finalized(btreeset! {shard_id});
810 tx.set_expression_cache_shard(ShardId::new())?;
811 }
812 tx.set_config(
813 EXPR_CACHE_MIGRATION_KEY.to_string(),
814 Some(EXPR_CACHE_MIGRATION_DONE),
815 )?;
816 }
817
818 // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
819 // binary version instead of the deploy generation.
820 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
821 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
822 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
823 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
824 {
825 if let Some(shard_id) = tx.get_builtin_migration_shard() {
826 tx.mark_shards_as_finalized(btreeset! {shard_id});
827 tx.set_builtin_migration_shard(ShardId::new())?;
828 }
829 tx.set_config(
830 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
831 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
832 )?;
833 }
834
835 if tx
836 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
837 .is_none()
838 {
839 let mut nonce = [0u8; 24];
840 let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
841 let nonce = BASE64_STANDARD.encode(nonce);
842 tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
843 }
844
845 Ok(())
846}
847
848// Add new migrations below their appropriate heading, and precede them with a
849// short summary of the migration's purpose and optional additional commentary
850// about safety or approach.
851//
852// The convention is to name the migration function using snake case:
853// > <category>_<description>_<version>
854//
855// Please include the adapter team on any code reviews that add or edit
856// migrations.
857
858fn add_to_audit_log(
859 tx: &mut Transaction,
860 event_type: mz_audit_log::EventType,
861 object_type: mz_audit_log::ObjectType,
862 details: mz_audit_log::EventDetails,
863 occurred_at: mz_ore::now::EpochMillis,
864) -> Result<(), anyhow::Error> {
865 let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
866 let event =
867 mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
868 tx.insert_audit_log_event(event);
869 Ok(())
870}
871
872// Remove PARTITION STRATEGY from CREATE SINK statements.
873fn ast_rewrite_create_sink_partition_strategy(
874 stmt: &mut Statement<Raw>,
875) -> Result<(), anyhow::Error> {
876 let Statement::CreateSink(stmt) = stmt else {
877 return Ok(());
878 };
879 stmt.with_options
880 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
881 Ok(())
882}