1use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24 CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25 SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44
45const MIGRATION_VERSION_KEY: &str = "migration_version";
55
56pub(crate) fn get_migration_version(txn: &Transaction<'_>) -> Option<Version> {
57 txn.get_setting(MIGRATION_VERSION_KEY.into())
58 .map(|s| s.parse().expect("valid migration version"))
59}
60
61pub(crate) fn set_migration_version(
62 txn: &mut Transaction<'_>,
63 version: Version,
64) -> Result<(), mz_catalog::durable::CatalogError> {
65 txn.set_setting(MIGRATION_VERSION_KEY.into(), Some(version.to_string()))
66}
67
68fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
69where
70 F: for<'a> FnMut(
71 &'a mut Transaction<'_>,
72 CatalogItemId,
73 &'a mut Statement<Raw>,
74 ) -> Result<(), anyhow::Error>,
75{
76 let mut updated_items = BTreeMap::new();
77
78 for mut item in tx.get_items() {
79 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
80 f(tx, item.id, &mut stmt)?;
81
82 item.create_sql = stmt.to_ast_string_stable();
83
84 updated_items.insert(item.id, item);
85 }
86 tx.update_items(updated_items)?;
87 Ok(())
88}
89
90fn rewrite_items<F>(
91 tx: &mut Transaction<'_>,
92 cat: &ConnCatalog<'_>,
93 mut f: F,
94) -> Result<(), anyhow::Error>
95where
96 F: for<'a> FnMut(
97 &'a mut Transaction<'_>,
98 &'a &ConnCatalog<'_>,
99 CatalogItemId,
100 &'a mut Statement<Raw>,
101 ) -> Result<(), anyhow::Error>,
102{
103 let mut updated_items = BTreeMap::new();
104 let items = tx.get_items();
105 for mut item in items {
106 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
107
108 f(tx, &cat, item.id, &mut stmt)?;
109
110 item.create_sql = stmt.to_ast_string_stable();
111
112 updated_items.insert(item.id, item);
113 }
114 tx.update_items(updated_items)?;
115 Ok(())
116}
117
118pub(crate) struct MigrateResult {
119 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
120 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
121}
122
123pub(crate) async fn migrate(
127 state: &mut CatalogState,
128 tx: &mut Transaction<'_>,
129 local_expr_cache: &mut LocalExpressionCache,
130 item_updates: Vec<StateUpdate>,
131 _now: NowFn,
132 _boot_ts: Timestamp,
133) -> Result<MigrateResult, anyhow::Error> {
134 let catalog_version = get_migration_version(tx).unwrap_or(Version::new(0, 0, 0));
135
136 info!(
137 "migrating statements from catalog version {:?}",
138 catalog_version
139 );
140
141 rewrite_ast_items(tx, |_tx, _id, stmt| {
142 ast_rewrite_create_sink_partition_strategy(stmt)?;
151 Ok(())
152 })?;
153
154 let commit_ts = tx.upper();
157 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
158 let op_item_updates = tx.get_and_commit_op_updates();
159 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
160 item_updates.extend(op_item_updates);
161 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
162
163 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
167 .into_iter()
168 .partition(|(kind, _, _)| {
171 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
172 });
173
174 let item_updates = item_updates
175 .into_iter()
176 .map(|(kind, ts, diff)| StateUpdate {
177 kind: kind.into(),
178 ts,
179 diff: diff.try_into().expect("valid diff"),
180 })
181 .collect();
182
183 let force_source_table_syntax = state.system_config().force_source_table_syntax();
184 if force_source_table_syntax {
188 state
189 .system_config_mut()
190 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
191 .expect("known parameter");
192 }
193
194 let mut ast_builtin_table_updates = state
195 .apply_updates_for_bootstrap(item_updates, local_expr_cache)
196 .await;
197
198 info!("migrating from catalog version {:?}", catalog_version);
199
200 let conn_cat = state.for_system_session();
201
202 if force_source_table_syntax {
205 rewrite_sources_to_tables(tx, &conn_cat)?;
206 }
207
208 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
209 let _catalog_version = catalog_version.clone();
210 Ok(())
225 })?;
226
227 if force_source_table_syntax {
228 state
229 .system_config_mut()
230 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
231 .expect("known parameter");
232 }
233
234 let op_item_updates = tx.get_and_commit_op_updates();
240 let item_builtin_table_updates = state
241 .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
242 .await;
243
244 ast_builtin_table_updates.extend(item_builtin_table_updates);
245
246 info!(
247 "migration from catalog version {:?} complete",
248 catalog_version
249 );
250 Ok(MigrateResult {
251 builtin_table_updates: ast_builtin_table_updates,
252 post_item_updates,
253 })
254}
255
256fn rewrite_sources_to_tables(
335 tx: &mut Transaction<'_>,
336 catalog: &ConnCatalog<'_>,
337) -> Result<(), anyhow::Error> {
338 use mz_sql::ast::{
339 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
340 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
341 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
342 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
343 UnresolvedItemName, Value, WithOptionValue,
344 };
345
346 let mut updated_items = BTreeMap::new();
347
348 let mut sources = vec![];
349 let mut subsources = vec![];
350
351 for item in tx.get_items() {
352 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
353 match stmt {
354 Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
355 Statement::CreateSource(stmt) => sources.push((item, stmt)),
356 _ => {}
357 }
358 }
359
360 let mut pending_progress_items = BTreeMap::new();
361 let mut migrated_source_ids = BTreeMap::new();
362 for (mut source_item, source_stmt) in sources {
365 let CreateSourceStatement {
366 name,
367 in_cluster,
368 col_names,
369 mut connection,
370 include_metadata,
371 format,
372 envelope,
373 if_not_exists,
374 key_constraint,
375 with_options,
376 external_references,
377 progress_subsource,
378 } = source_stmt;
379
380 let (progress_name, progress_item) = match progress_subsource {
381 Some(DeferredItemName::Named(RawItemName::Name(name))) => {
382 let partial_name = normalize::unresolved_item_name(name.clone())?;
383 (name, catalog.resolve_item(&partial_name)?)
384 }
385 Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
386 let gid = id.parse()?;
387 (name, catalog.get_item(&gid))
388 }
389 Some(DeferredItemName::Deferred(_)) => {
390 unreachable!("invalid progress subsource")
391 }
392 None => {
393 info!("migrate: skipping already migrated source: {name}");
394 continue;
395 }
396 };
397 let raw_progress_name =
398 RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
399
400 let catalog_item = catalog.get_item(&source_item.id);
402 let source_name: &QualifiedItemName = catalog_item.name();
403 let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
404 let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
405
406 match &mut connection {
408 CreateSourceConnection::Postgres { options, .. } => {
409 options.retain(|o| match o.name {
410 PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
411 PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
412 });
413 }
414 CreateSourceConnection::SqlServer { options, .. } => {
415 options.retain(|o| match o.name {
416 SqlServerConfigOptionName::Details => true,
417 SqlServerConfigOptionName::TextColumns
418 | SqlServerConfigOptionName::ExcludeColumns => false,
419 });
420 }
421 CreateSourceConnection::MySql { options, .. } => {
422 options.retain(|o| match o.name {
423 MySqlConfigOptionName::Details => true,
424 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
425 false
426 }
427 });
428 }
429 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
430 }
431 }
432
433 let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
435 match connection {
436 connection @ (CreateSourceConnection::Postgres { .. }
437 | CreateSourceConnection::MySql { .. }
438 | CreateSourceConnection::SqlServer { .. }
439 | CreateSourceConnection::LoadGenerator {
440 generator:
441 LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
442 ..
443 }) => {
444 assert_eq!(col_names, &[]);
446 assert_eq!(key_constraint, None);
447 assert_eq!(format, None);
448 assert_eq!(envelope, None);
449 assert_eq!(include_metadata, &[]);
450 assert_eq!(external_references, None);
451
452 let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
460 if_exists: IfExistsBehavior::Error,
461 temporary: false,
462 definition: ViewDefinition {
463 name: progress_name,
464 columns: vec![],
465 query: Query {
466 ctes: CteBlock::Simple(vec![]),
467 body: SetExpr::Table(RawItemName::Id(
468 progress_item.id().to_string(),
469 source_name.clone(),
470 None,
471 )),
472 order_by: vec![],
473 limit: None,
474 offset: None,
475 },
476 },
477 });
478
479 let new_progress_stmt = CreateSourceStatement {
480 name: source_name.clone(),
481 in_cluster,
482 col_names: vec![],
483 connection,
484 include_metadata: vec![],
485 format: None,
486 envelope: None,
487 if_not_exists,
488 key_constraint: None,
489 with_options,
490 external_references: None,
491 progress_subsource: None,
492 };
493
494 migrated_source_ids.insert(source_item.id, progress_item.id());
495
496 (
497 full_source_name.item,
498 new_progress_stmt,
499 progress_item.name().item.clone(),
500 dummy_source_stmt,
501 )
502 }
503 CreateSourceConnection::Kafka {
504 options,
505 connection,
506 } => {
507 let constraints = if let Some(_key_constraint) = key_constraint {
508 vec![]
511 } else {
512 vec![]
513 };
514
515 let columns = if col_names.is_empty() {
516 TableFromSourceColumns::NotSpecified
517 } else {
518 TableFromSourceColumns::Named(col_names)
519 };
520
521 let details = SourceExportStatementDetails::Kafka {};
524 let table_with_options = vec![TableFromSourceOption {
525 name: TableFromSourceOptionName::Details,
526 value: Some(WithOptionValue::Value(Value::String(hex::encode(
527 details.into_proto().encode_to_vec(),
528 )))),
529 }];
530 let topic_option = options
532 .iter()
533 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
534 .expect("kafka sources must have a topic");
535 let topic = match &topic_option.value {
536 Some(WithOptionValue::Value(Value::String(topic))) => topic,
537 _ => unreachable!("topic must be a string"),
538 };
539 let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
540
541 let new_source_stmt =
542 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
543 name: source_name,
544 constraints,
545 columns,
546 if_not_exists,
547 source: raw_progress_name,
548 include_metadata,
549 format,
550 envelope,
551 external_reference: Some(external_reference),
552 with_options: table_with_options,
553 });
554
555 let new_progress_stmt = CreateSourceStatement {
556 name: progress_name,
557 in_cluster,
558 col_names: vec![],
559 connection: CreateSourceConnection::Kafka {
560 options,
561 connection,
562 },
563 include_metadata: vec![],
564 format: None,
565 envelope: None,
566 if_not_exists,
567 key_constraint: None,
568 with_options,
569 external_references: None,
570 progress_subsource: None,
571 };
572 (
573 progress_item.name().item.clone(),
574 new_progress_stmt,
575 full_source_name.item,
576 new_source_stmt,
577 )
578 }
579 CreateSourceConnection::LoadGenerator {
580 generator:
581 generator @ (LoadGenerator::Clock
582 | LoadGenerator::Counter
583 | LoadGenerator::Datums
584 | LoadGenerator::KeyValue),
585 options,
586 } => {
587 let constraints = if let Some(_key_constraint) = key_constraint {
588 vec![]
590 } else {
591 vec![]
592 };
593
594 let columns = if col_names.is_empty() {
595 TableFromSourceColumns::NotSpecified
596 } else {
597 TableFromSourceColumns::Named(col_names)
598 };
599
600 let details = SourceExportStatementDetails::LoadGenerator {
603 output: LoadGeneratorOutput::Default,
604 };
605 let table_with_options = vec![TableFromSourceOption {
606 name: TableFromSourceOptionName::Details,
607 value: Some(WithOptionValue::Value(Value::String(hex::encode(
608 details.into_proto().encode_to_vec(),
609 )))),
610 }];
611 let external_reference = FullItemName {
614 database: mz_sql::names::RawDatabaseSpecifier::Name(
615 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
616 .to_owned(),
617 ),
618 schema: generator.schema_name().to_string(),
619 item: generator.schema_name().to_string(),
620 };
621
622 let new_source_stmt =
623 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
624 name: source_name,
625 constraints,
626 columns,
627 if_not_exists,
628 source: raw_progress_name,
629 include_metadata,
630 format,
631 envelope,
632 external_reference: Some(external_reference.into()),
633 with_options: table_with_options,
634 });
635
636 let new_progress_stmt = CreateSourceStatement {
637 name: progress_name,
638 in_cluster,
639 col_names: vec![],
640 connection: CreateSourceConnection::LoadGenerator { generator, options },
641 include_metadata: vec![],
642 format: None,
643 envelope: None,
644 if_not_exists,
645 key_constraint: None,
646 with_options,
647 external_references: None,
648 progress_subsource: None,
649 };
650 (
651 progress_item.name().item.clone(),
652 new_progress_stmt,
653 full_source_name.item,
654 new_source_stmt,
655 )
656 }
657 };
658
659 info!(
663 "migrate: converted source {} to {}",
664 source_item.create_sql, new_source_stmt
665 );
666 source_item.name = new_source_name.clone();
667 source_item.create_sql = new_source_stmt.to_ast_string_stable();
668 updated_items.insert(source_item.id, source_item);
669 pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
670 }
671
672 for (mut item, stmt) in subsources {
673 match stmt {
674 CreateSubsourceStatement {
677 of_source: None, ..
678 } => {
679 let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
680 panic!("encountered orphan progress subsource id: {}", item.id)
681 };
682 item.name = new_name;
683 item.create_sql = new_stmt.to_ast_string_stable();
684 updated_items.insert(item.id, item);
685 }
686 CreateSubsourceStatement {
689 name,
690 columns,
691 constraints,
692 of_source: Some(raw_source_name),
693 if_not_exists,
694 mut with_options,
695 } => {
696 let new_raw_source_name = match raw_source_name {
697 RawItemName::Id(old_id, name, None) => {
698 let old_id: CatalogItemId = old_id.parse().expect("well formed");
699 let new_id = migrated_source_ids[&old_id].clone();
700 RawItemName::Id(new_id.to_string(), name, None)
701 }
702 _ => unreachable!("unexpected source name: {raw_source_name}"),
703 };
704 let external_reference = match with_options
707 .iter()
708 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
709 {
710 Some(i) => match with_options.remove(i).value {
711 Some(WithOptionValue::UnresolvedItemName(name)) => name,
712 _ => unreachable!("external reference must be an unresolved item name"),
713 },
714 None => panic!("subsource must have an external reference"),
715 };
716
717 let with_options = with_options
718 .into_iter()
719 .map(|option| {
720 match option.name {
721 CreateSubsourceOptionName::Details => TableFromSourceOption {
722 name: TableFromSourceOptionName::Details,
723 value: option.value,
726 },
727 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
728 name: TableFromSourceOptionName::TextColumns,
729 value: option.value,
730 },
731 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
732 name: TableFromSourceOptionName::ExcludeColumns,
733 value: option.value,
734 },
735 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
736 name: TableFromSourceOptionName::RetainHistory,
737 value: option.value,
738 },
739 CreateSubsourceOptionName::Progress => {
740 panic!("progress option should not exist on this subsource")
741 }
742 CreateSubsourceOptionName::ExternalReference => {
743 unreachable!("This option is handled separately above.")
744 }
745 }
746 })
747 .collect::<Vec<_>>();
748
749 let table = CreateTableFromSourceStatement {
750 name,
751 constraints,
752 columns: TableFromSourceColumns::Defined(columns),
753 if_not_exists,
754 source: new_raw_source_name,
755 external_reference: Some(external_reference),
756 with_options,
757 envelope: None,
759 include_metadata: vec![],
760 format: None,
761 };
762
763 info!(
764 "migrate: converted subsource {} to table {}",
765 item.create_sql, table
766 );
767 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
768 updated_items.insert(item.id, item);
769 }
770 }
771 }
772 assert!(
773 pending_progress_items.is_empty(),
774 "unexpected residual progress items: {pending_progress_items:?}"
775 );
776
777 tx.update_items(updated_items)?;
778
779 Ok(())
780}
781
782pub(crate) fn durable_migrate(
786 tx: &mut Transaction,
787 _organization_id: Uuid,
788 _boot_ts: Timestamp,
789) -> Result<(), anyhow::Error> {
790 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
793 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
794 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
795 if let Some(shard_id) = tx.get_expression_cache_shard() {
796 tx.mark_shards_as_finalized(btreeset! {shard_id});
797 tx.set_expression_cache_shard(ShardId::new())?;
798 }
799 tx.set_config(
800 EXPR_CACHE_MIGRATION_KEY.to_string(),
801 Some(EXPR_CACHE_MIGRATION_DONE),
802 )?;
803 }
804
805 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
808 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
809 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
810 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
811 {
812 if let Some(shard_id) = tx.get_builtin_migration_shard() {
813 tx.mark_shards_as_finalized(btreeset! {shard_id});
814 tx.set_builtin_migration_shard(ShardId::new())?;
815 }
816 tx.set_config(
817 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
818 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
819 )?;
820 }
821
822 if tx
823 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
824 .is_none()
825 {
826 let mut nonce = [0u8; 24];
827 let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
828 let nonce = BASE64_STANDARD.encode(nonce);
829 tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
830 }
831
832 Ok(())
833}
834
835fn ast_rewrite_create_sink_partition_strategy(
847 stmt: &mut Statement<Raw>,
848) -> Result<(), anyhow::Error> {
849 let Statement::CreateSink(stmt) = stmt else {
850 return Ok(());
851 };
852 stmt.with_options
853 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
854 Ok(())
855}