1use std::collections::{BTreeMap, BTreeSet};
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::{BUILTINS, BuiltinTable};
15use mz_catalog::durable::objects::{SystemObjectDescription, SystemObjectMapping};
16use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
17use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
18use mz_ore::collections::CollectionExt;
19use mz_ore::now::NowFn;
20use mz_persist_types::ShardId;
21use mz_proto::RustType;
22use mz_repr::{CatalogItemId, Diff, Timestamp};
23use mz_sql::ast::display::AstDisplay;
24use mz_sql::ast::{
25 CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
26 SetExpr, SqlServerConfigOptionName, ViewDefinition,
27};
28use mz_sql::catalog::{CatalogItemType, SessionCatalog};
29use mz_sql::names::{FullItemName, QualifiedItemName};
30use mz_sql::normalize;
31use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
32use mz_sql_parser::ast::{Raw, Statement};
33use mz_storage_client::controller::StorageTxn;
34use mz_storage_types::sources::SourceExportStatementDetails;
35use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
36use prost::Message;
37use semver::Version;
38use tracing::info;
39use uuid::Uuid;
40
41use crate::catalog::open::into_consolidatable_updates_startup;
43use crate::catalog::state::LocalExpressionCache;
44use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
45use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
46
47const MIGRATION_VERSION_KEY: &str = "migration_version";
57
58pub(crate) fn get_migration_version(txn: &Transaction<'_>) -> Option<Version> {
59 txn.get_setting(MIGRATION_VERSION_KEY.into())
60 .map(|s| s.parse().expect("valid migration version"))
61}
62
63pub(crate) fn set_migration_version(
64 txn: &mut Transaction<'_>,
65 version: Version,
66) -> Result<(), mz_catalog::durable::CatalogError> {
67 txn.set_setting(MIGRATION_VERSION_KEY.into(), Some(version.to_string()))
68}
69
70fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
71where
72 F: for<'a> FnMut(
73 &'a mut Transaction<'_>,
74 CatalogItemId,
75 &'a mut Statement<Raw>,
76 ) -> Result<(), anyhow::Error>,
77{
78 let mut updated_items = BTreeMap::new();
79
80 for mut item in tx.get_items() {
81 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
82 f(tx, item.id, &mut stmt)?;
83
84 item.create_sql = stmt.to_ast_string_stable();
85
86 updated_items.insert(item.id, item);
87 }
88 tx.update_items(updated_items)?;
89 Ok(())
90}
91
92fn rewrite_items<F>(
93 tx: &mut Transaction<'_>,
94 cat: &ConnCatalog<'_>,
95 mut f: F,
96) -> Result<(), anyhow::Error>
97where
98 F: for<'a> FnMut(
99 &'a mut Transaction<'_>,
100 &'a &ConnCatalog<'_>,
101 CatalogItemId,
102 &'a mut Statement<Raw>,
103 ) -> Result<(), anyhow::Error>,
104{
105 let mut updated_items = BTreeMap::new();
106 let items = tx.get_items();
107 for mut item in items {
108 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
109
110 f(tx, &cat, item.id, &mut stmt)?;
111
112 item.create_sql = stmt.to_ast_string_stable();
113
114 updated_items.insert(item.id, item);
115 }
116 tx.update_items(updated_items)?;
117 Ok(())
118}
119
120pub(crate) struct MigrateResult {
121 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
122 pub(crate) catalog_updates: Vec<ParsedStateUpdate>,
123 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
124}
125
126pub(crate) async fn migrate(
130 state: &mut CatalogState,
131 tx: &mut Transaction<'_>,
132 local_expr_cache: &mut LocalExpressionCache,
133 item_updates: Vec<StateUpdate>,
134 _now: NowFn,
135 _boot_ts: Timestamp,
136) -> Result<MigrateResult, anyhow::Error> {
137 let catalog_version = get_migration_version(tx).unwrap_or(Version::new(0, 0, 0));
138
139 info!(
140 "migrating statements from catalog version {:?}",
141 catalog_version
142 );
143
144 rewrite_ast_items(tx, |tx, _id, stmt| {
145 ast_rewrite_create_sink_partition_strategy(stmt)?;
154 ast_rewrite_sql_server_constraints(stmt)?;
155 ast_rewrite_add_missing_index_ids(tx, stmt)?;
156 Ok(())
157 })?;
158
159 let commit_ts = tx.upper();
162 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
163 let op_item_updates = tx.get_and_commit_op_updates();
164 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
165 item_updates.extend(op_item_updates);
166 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
167
168 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
172 .into_iter()
173 .partition(|(kind, _, _)| {
176 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
177 });
178
179 let item_updates = item_updates
180 .into_iter()
181 .map(|(kind, ts, diff)| StateUpdate {
182 kind: kind.into(),
183 ts,
184 diff: diff.try_into().expect("valid diff"),
185 })
186 .collect();
187
188 let force_source_table_syntax = state.system_config().force_source_table_syntax();
189 if force_source_table_syntax {
193 state
194 .system_config_mut()
195 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
196 .expect("known parameter");
197 }
198
199 let (mut ast_builtin_table_updates, mut ast_catalog_updates) =
200 state.apply_updates(item_updates, local_expr_cache).await;
201
202 info!("migrating from catalog version {:?}", catalog_version);
203
204 let conn_cat = state.for_system_session();
205
206 if force_source_table_syntax {
209 rewrite_sources_to_tables(tx, &conn_cat)?;
210 }
211
212 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
213 let _catalog_version = catalog_version.clone();
214 Ok(())
229 })?;
230
231 if force_source_table_syntax {
232 state
233 .system_config_mut()
234 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
235 .expect("known parameter");
236 }
237
238 let op_item_updates = tx.get_and_commit_op_updates();
244 let (item_builtin_table_updates, item_catalog_updates) =
245 state.apply_updates(op_item_updates, local_expr_cache).await;
246
247 ast_builtin_table_updates.extend(item_builtin_table_updates);
248 ast_catalog_updates.extend(item_catalog_updates);
249
250 info!(
251 "migration from catalog version {:?} complete",
252 catalog_version
253 );
254
255 Ok(MigrateResult {
256 builtin_table_updates: ast_builtin_table_updates,
257 catalog_updates: ast_catalog_updates,
258 post_item_updates,
259 })
260}
261
262fn rewrite_sources_to_tables(
341 tx: &mut Transaction<'_>,
342 catalog: &ConnCatalog<'_>,
343) -> Result<(), anyhow::Error> {
344 use mz_sql::ast::{
345 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
346 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
347 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
348 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
349 UnresolvedItemName, Value, WithOptionValue,
350 };
351
352 let mut updated_items = BTreeMap::new();
353
354 let mut sources = vec![];
355 let mut subsources = vec![];
356
357 for item in tx.get_items() {
358 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
359 match stmt {
360 Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
361 Statement::CreateSource(stmt) => sources.push((item, stmt)),
362 _ => {}
363 }
364 }
365
366 let mut pending_progress_items = BTreeMap::new();
367 let mut migrated_source_ids = BTreeMap::new();
368 for (mut source_item, source_stmt) in sources {
371 let CreateSourceStatement {
372 name,
373 in_cluster,
374 col_names,
375 mut connection,
376 include_metadata,
377 format,
378 envelope,
379 if_not_exists,
380 key_constraint,
381 with_options,
382 external_references,
383 progress_subsource,
384 } = source_stmt;
385
386 let (progress_name, progress_item) = match progress_subsource {
387 Some(DeferredItemName::Named(RawItemName::Name(name))) => {
388 let partial_name = normalize::unresolved_item_name(name.clone())?;
389 (name, catalog.resolve_item(&partial_name)?)
390 }
391 Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
392 let gid = id.parse()?;
393 (name, catalog.get_item(&gid))
394 }
395 Some(DeferredItemName::Deferred(_)) => {
396 unreachable!("invalid progress subsource")
397 }
398 None => {
399 info!("migrate: skipping already migrated source: {name}");
400 continue;
401 }
402 };
403 let raw_progress_name =
404 RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
405
406 let catalog_item = catalog.get_item(&source_item.id);
408 let source_name: &QualifiedItemName = catalog_item.name();
409 let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
410 let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
411
412 match &mut connection {
414 CreateSourceConnection::Postgres { options, .. } => {
415 options.retain(|o| match o.name {
416 PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
417 PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
418 });
419 }
420 CreateSourceConnection::SqlServer { options, .. } => {
421 options.retain(|o| match o.name {
422 SqlServerConfigOptionName::Details => true,
423 SqlServerConfigOptionName::TextColumns
424 | SqlServerConfigOptionName::ExcludeColumns => false,
425 });
426 }
427 CreateSourceConnection::MySql { options, .. } => {
428 options.retain(|o| match o.name {
429 MySqlConfigOptionName::Details => true,
430 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
431 false
432 }
433 });
434 }
435 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
436 }
437 }
438
439 let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
441 match connection {
442 connection @ (CreateSourceConnection::Postgres { .. }
443 | CreateSourceConnection::MySql { .. }
444 | CreateSourceConnection::SqlServer { .. }
445 | CreateSourceConnection::LoadGenerator {
446 generator:
447 LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
448 ..
449 }) => {
450 assert_eq!(col_names, &[]);
452 assert_eq!(key_constraint, None);
453 assert_eq!(format, None);
454 assert_eq!(envelope, None);
455 assert_eq!(include_metadata, &[]);
456 assert_eq!(external_references, None);
457
458 let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
466 if_exists: IfExistsBehavior::Error,
467 temporary: false,
468 definition: ViewDefinition {
469 name: progress_name,
470 columns: vec![],
471 query: Query {
472 ctes: CteBlock::Simple(vec![]),
473 body: SetExpr::Table(RawItemName::Id(
474 progress_item.id().to_string(),
475 source_name.clone(),
476 None,
477 )),
478 order_by: vec![],
479 limit: None,
480 offset: None,
481 },
482 },
483 });
484
485 let new_progress_stmt = CreateSourceStatement {
486 name: source_name.clone(),
487 in_cluster,
488 col_names: vec![],
489 connection,
490 include_metadata: vec![],
491 format: None,
492 envelope: None,
493 if_not_exists,
494 key_constraint: None,
495 with_options,
496 external_references: None,
497 progress_subsource: None,
498 };
499
500 migrated_source_ids.insert(source_item.id, progress_item.id());
501
502 (
503 full_source_name.item,
504 new_progress_stmt,
505 progress_item.name().item.clone(),
506 dummy_source_stmt,
507 )
508 }
509 CreateSourceConnection::Kafka {
510 options,
511 connection,
512 } => {
513 let constraints = if let Some(_key_constraint) = key_constraint {
514 vec![]
517 } else {
518 vec![]
519 };
520
521 let columns = if col_names.is_empty() {
522 TableFromSourceColumns::NotSpecified
523 } else {
524 TableFromSourceColumns::Named(col_names)
525 };
526
527 let details = SourceExportStatementDetails::Kafka {};
530 let table_with_options = vec![TableFromSourceOption {
531 name: TableFromSourceOptionName::Details,
532 value: Some(WithOptionValue::Value(Value::String(hex::encode(
533 details.into_proto().encode_to_vec(),
534 )))),
535 }];
536 let topic_option = options
538 .iter()
539 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
540 .expect("kafka sources must have a topic");
541 let topic = match &topic_option.value {
542 Some(WithOptionValue::Value(Value::String(topic))) => topic,
543 _ => unreachable!("topic must be a string"),
544 };
545 let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
546
547 let new_source_stmt =
548 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
549 name: source_name,
550 constraints,
551 columns,
552 if_not_exists,
553 source: raw_progress_name,
554 include_metadata,
555 format,
556 envelope,
557 external_reference: Some(external_reference),
558 with_options: table_with_options,
559 });
560
561 let new_progress_stmt = CreateSourceStatement {
562 name: progress_name,
563 in_cluster,
564 col_names: vec![],
565 connection: CreateSourceConnection::Kafka {
566 options,
567 connection,
568 },
569 include_metadata: vec![],
570 format: None,
571 envelope: None,
572 if_not_exists,
573 key_constraint: None,
574 with_options,
575 external_references: None,
576 progress_subsource: None,
577 };
578 (
579 progress_item.name().item.clone(),
580 new_progress_stmt,
581 full_source_name.item,
582 new_source_stmt,
583 )
584 }
585 CreateSourceConnection::LoadGenerator {
586 generator:
587 generator @ (LoadGenerator::Clock
588 | LoadGenerator::Counter
589 | LoadGenerator::Datums
590 | LoadGenerator::KeyValue),
591 options,
592 } => {
593 let constraints = if let Some(_key_constraint) = key_constraint {
594 vec![]
596 } else {
597 vec![]
598 };
599
600 let columns = if col_names.is_empty() {
601 TableFromSourceColumns::NotSpecified
602 } else {
603 TableFromSourceColumns::Named(col_names)
604 };
605
606 let details = SourceExportStatementDetails::LoadGenerator {
609 output: LoadGeneratorOutput::Default,
610 };
611 let table_with_options = vec![TableFromSourceOption {
612 name: TableFromSourceOptionName::Details,
613 value: Some(WithOptionValue::Value(Value::String(hex::encode(
614 details.into_proto().encode_to_vec(),
615 )))),
616 }];
617 let external_reference = FullItemName {
620 database: mz_sql::names::RawDatabaseSpecifier::Name(
621 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
622 .to_owned(),
623 ),
624 schema: generator.schema_name().to_string(),
625 item: generator.schema_name().to_string(),
626 };
627
628 let new_source_stmt =
629 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
630 name: source_name,
631 constraints,
632 columns,
633 if_not_exists,
634 source: raw_progress_name,
635 include_metadata,
636 format,
637 envelope,
638 external_reference: Some(external_reference.into()),
639 with_options: table_with_options,
640 });
641
642 let new_progress_stmt = CreateSourceStatement {
643 name: progress_name,
644 in_cluster,
645 col_names: vec![],
646 connection: CreateSourceConnection::LoadGenerator { generator, options },
647 include_metadata: vec![],
648 format: None,
649 envelope: None,
650 if_not_exists,
651 key_constraint: None,
652 with_options,
653 external_references: None,
654 progress_subsource: None,
655 };
656 (
657 progress_item.name().item.clone(),
658 new_progress_stmt,
659 full_source_name.item,
660 new_source_stmt,
661 )
662 }
663 };
664
665 info!(
669 "migrate: converted source {} to {}",
670 source_item.create_sql, new_source_stmt
671 );
672 source_item.name = new_source_name.clone();
673 source_item.create_sql = new_source_stmt.to_ast_string_stable();
674 updated_items.insert(source_item.id, source_item);
675 pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
676 }
677
678 for (mut item, stmt) in subsources {
679 match stmt {
680 CreateSubsourceStatement {
683 of_source: None, ..
684 } => {
685 let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
686 panic!("encountered orphan progress subsource id: {}", item.id)
687 };
688 item.name = new_name;
689 item.create_sql = new_stmt.to_ast_string_stable();
690 updated_items.insert(item.id, item);
691 }
692 CreateSubsourceStatement {
695 name,
696 columns,
697 constraints,
698 of_source: Some(raw_source_name),
699 if_not_exists,
700 mut with_options,
701 } => {
702 let new_raw_source_name = match raw_source_name {
703 RawItemName::Id(old_id, name, None) => {
704 let old_id: CatalogItemId = old_id.parse().expect("well formed");
705 let new_id = migrated_source_ids[&old_id].clone();
706 RawItemName::Id(new_id.to_string(), name, None)
707 }
708 _ => unreachable!("unexpected source name: {raw_source_name}"),
709 };
710 let external_reference = match with_options
713 .iter()
714 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
715 {
716 Some(i) => match with_options.remove(i).value {
717 Some(WithOptionValue::UnresolvedItemName(name)) => name,
718 _ => unreachable!("external reference must be an unresolved item name"),
719 },
720 None => panic!("subsource must have an external reference"),
721 };
722
723 let with_options = with_options
724 .into_iter()
725 .map(|option| {
726 match option.name {
727 CreateSubsourceOptionName::Details => TableFromSourceOption {
728 name: TableFromSourceOptionName::Details,
729 value: option.value,
732 },
733 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
734 name: TableFromSourceOptionName::TextColumns,
735 value: option.value,
736 },
737 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
738 name: TableFromSourceOptionName::ExcludeColumns,
739 value: option.value,
740 },
741 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
742 name: TableFromSourceOptionName::RetainHistory,
743 value: option.value,
744 },
745 CreateSubsourceOptionName::Progress => {
746 panic!("progress option should not exist on this subsource")
747 }
748 CreateSubsourceOptionName::ExternalReference => {
749 unreachable!("This option is handled separately above.")
750 }
751 }
752 })
753 .collect::<Vec<_>>();
754
755 let table = CreateTableFromSourceStatement {
756 name,
757 constraints,
758 columns: TableFromSourceColumns::Defined(columns),
759 if_not_exists,
760 source: new_raw_source_name,
761 external_reference: Some(external_reference),
762 with_options,
763 envelope: None,
765 include_metadata: vec![],
766 format: None,
767 };
768
769 info!(
770 "migrate: converted subsource {} to table {}",
771 item.create_sql, table
772 );
773 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
774 updated_items.insert(item.id, item);
775 }
776 }
777 }
778 assert!(
779 pending_progress_items.is_empty(),
780 "unexpected residual progress items: {pending_progress_items:?}"
781 );
782
783 tx.update_items(updated_items)?;
784
785 Ok(())
786}
787
788pub(crate) fn durable_migrate(
792 tx: &mut Transaction,
793 _organization_id: Uuid,
794 _boot_ts: Timestamp,
795) -> Result<(), anyhow::Error> {
796 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
799 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
800 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
801 if let Some(shard_id) = tx.get_expression_cache_shard() {
802 tx.mark_shards_as_finalized(btreeset! {shard_id});
803 tx.set_expression_cache_shard(ShardId::new())?;
804 }
805 tx.set_config(
806 EXPR_CACHE_MIGRATION_KEY.to_string(),
807 Some(EXPR_CACHE_MIGRATION_DONE),
808 )?;
809 }
810
811 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
814 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
815 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
816 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
817 {
818 if let Some(shard_id) = tx.get_builtin_migration_shard() {
819 tx.mark_shards_as_finalized(btreeset! {shard_id});
820 tx.set_builtin_migration_shard(ShardId::new())?;
821 }
822 tx.set_config(
823 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
824 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
825 )?;
826 }
827
828 if tx
829 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
830 .is_none()
831 {
832 let mut nonce = [0u8; 24];
833 openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
834 let nonce = BASE64_STANDARD.encode(nonce);
835 tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
836 }
837
838 migrate_builtin_tables_to_mvs(tx)?;
839
840 Ok(())
841}
842
843fn migrate_builtin_tables_to_mvs(tx: &mut Transaction) -> Result<(), anyhow::Error> {
847 let expected_mvs: BTreeSet<_> = BUILTINS::materialized_views()
849 .map(|mv| (mv.schema, mv.name))
850 .collect();
851
852 let mut to_remove = BTreeSet::new();
854 let mut to_add = Vec::new();
855 for mapping in tx.get_system_object_mappings() {
856 let desc = &mapping.description;
857 if desc.object_type != CatalogItemType::Table {
858 continue;
859 }
860
861 let key = (&*desc.schema_name, &*desc.object_name);
862 if expected_mvs.contains(&key) {
863 info!(
864 "migrate: builtin {}.{} changed type from table to MV",
865 desc.schema_name, desc.object_name,
866 );
867 to_remove.insert(desc.clone());
868 to_add.push(SystemObjectMapping {
869 description: SystemObjectDescription {
870 schema_name: desc.schema_name.clone(),
871 object_type: CatalogItemType::MaterializedView,
872 object_name: desc.object_name.clone(),
873 },
874 unique_identifier: mapping.unique_identifier,
875 });
876 }
877 }
878
879 if !to_remove.is_empty() {
880 tx.remove_system_object_mappings(to_remove)?;
881 tx.set_system_object_mappings(to_add)?;
882 }
883
884 Ok(())
885}
886
887fn ast_rewrite_create_sink_partition_strategy(
899 stmt: &mut Statement<Raw>,
900) -> Result<(), anyhow::Error> {
901 let Statement::CreateSink(stmt) = stmt else {
902 return Ok(());
903 };
904 stmt.with_options
905 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
906 Ok(())
907}
908
909fn ast_rewrite_sql_server_constraints(stmt: &mut Statement<Raw>) -> Result<(), anyhow::Error> {
911 use mz_sql::ast::{
912 CreateSubsourceOptionName, TableFromSourceOptionName, Value, WithOptionValue,
913 };
914 use mz_sql_server_util::desc::{SqlServerTableConstraint, SqlServerTableConstraintType};
915 use mz_storage_types::sources::ProtoSourceExportStatementDetails;
916 use mz_storage_types::sources::proto_source_export_statement_details::Kind;
917
918 let deets: Option<&mut String> = match stmt {
919 Statement::CreateSubsource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
920 if matches!(option.name, CreateSubsourceOptionName::Details)
921 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
922 {
923 Some(details)
924 } else {
925 None
926 }
927 }),
928 Statement::CreateTableFromSource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
929 if matches!(option.name, TableFromSourceOptionName::Details)
930 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
931 {
932 Some(details)
933 } else {
934 None
935 }
936 }),
937 _ => None,
938 };
939 let Some(deets) = deets else {
940 return Ok(());
941 };
942
943 let current_value = hex::decode(&mut *deets)?;
944 let current_value = ProtoSourceExportStatementDetails::decode(&*current_value)?;
945
946 if !matches!(current_value.kind, Some(Kind::SqlServer(_))) {
948 return Ok(());
949 };
950
951 let SourceExportStatementDetails::SqlServer {
952 mut table,
953 capture_instance,
954 initial_lsn,
955 } = SourceExportStatementDetails::from_proto(current_value)?
956 else {
957 unreachable!("statement details must exist for SQL Server");
958 };
959
960 if !table.constraints.is_empty() {
962 return Ok(());
963 }
964
965 let mut migrated_constraints: BTreeMap<_, Vec<_>> = BTreeMap::new();
968 for col in table.columns.iter_mut() {
969 if let Some(constraint_name) = col.primary_key_constraint.take() {
970 migrated_constraints
971 .entry(constraint_name)
972 .or_default()
973 .push(col.name.to_string());
974 }
975 }
976
977 table.constraints = migrated_constraints
978 .into_iter()
979 .map(|(constraint_name, column_names)| SqlServerTableConstraint {
980 constraint_name: constraint_name.to_string(),
981 constraint_type: SqlServerTableConstraintType::PrimaryKey,
982 column_names,
983 })
984 .collect();
985
986 let new_value = SourceExportStatementDetails::SqlServer {
987 table,
988 capture_instance,
989 initial_lsn,
990 };
991 *deets = hex::encode(new_value.into_proto().encode_to_vec());
992
993 Ok(())
994}
995
996fn ast_rewrite_add_missing_index_ids(
998 tx: &Transaction<'_>,
999 stmt: &mut Statement<Raw>,
1000) -> Result<(), anyhow::Error> {
1001 let Statement::CreateIndex(stmt) = stmt else {
1002 return Ok(());
1003 };
1004
1005 let unresolved_name = match stmt.on_name.clone() {
1006 mz_sql::ast::RawItemName::Name(name) => name,
1007 mz_sql::ast::RawItemName::Id(..) => return Ok(()),
1009 };
1010
1011 let parts = &unresolved_name.0;
1012 let (db_name, schema_name, item_name) = match parts.len() {
1013 3 => (Some(&parts[0]), &parts[1], &parts[2]),
1014 2 => (None, &parts[0], &parts[1]),
1015 _ => panic!("invalid unresolved name: {unresolved_name:?}"),
1016 };
1017
1018 let db_id = db_name.map(|x| {
1019 let db = tx.get_databases().find(|db| db.name == x.as_str());
1020 let db = db.unwrap_or_else(|| panic!("missing database: {x}"));
1021 db.id
1022 });
1023 let schema_id = {
1024 let schema = tx
1025 .get_schemas()
1026 .find(|s| s.name == schema_name.as_str() && s.database_id == db_id);
1027 let schema = schema.unwrap_or_else(|| panic!("missing schema: {schema_name}, {db_id:?}"));
1028 schema.id
1029 };
1030 let item_id = {
1031 let item = tx
1032 .get_items()
1033 .find(|i| i.name == item_name.as_str() && i.schema_id == schema_id);
1034 let item = item.unwrap_or_else(|| panic!("missing item: {item_name}, {schema_id:?}"));
1035 item.id
1036 };
1037
1038 stmt.on_name = mz_sql::ast::RawItemName::Id(item_id.to_string(), unresolved_name, None);
1039
1040 Ok(())
1041}