1use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24 CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25 SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
45
46const MIGRATION_VERSION_KEY: &str = "migration_version";
56
57pub(crate) fn get_migration_version(txn: &Transaction<'_>) -> Option<Version> {
58 txn.get_setting(MIGRATION_VERSION_KEY.into())
59 .map(|s| s.parse().expect("valid migration version"))
60}
61
62pub(crate) fn set_migration_version(
63 txn: &mut Transaction<'_>,
64 version: Version,
65) -> Result<(), mz_catalog::durable::CatalogError> {
66 txn.set_setting(MIGRATION_VERSION_KEY.into(), Some(version.to_string()))
67}
68
69fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
70where
71 F: for<'a> FnMut(
72 &'a mut Transaction<'_>,
73 CatalogItemId,
74 &'a mut Statement<Raw>,
75 ) -> Result<(), anyhow::Error>,
76{
77 let mut updated_items = BTreeMap::new();
78
79 for mut item in tx.get_items() {
80 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
81 f(tx, item.id, &mut stmt)?;
82
83 item.create_sql = stmt.to_ast_string_stable();
84
85 updated_items.insert(item.id, item);
86 }
87 tx.update_items(updated_items)?;
88 Ok(())
89}
90
91fn rewrite_items<F>(
92 tx: &mut Transaction<'_>,
93 cat: &ConnCatalog<'_>,
94 mut f: F,
95) -> Result<(), anyhow::Error>
96where
97 F: for<'a> FnMut(
98 &'a mut Transaction<'_>,
99 &'a &ConnCatalog<'_>,
100 CatalogItemId,
101 &'a mut Statement<Raw>,
102 ) -> Result<(), anyhow::Error>,
103{
104 let mut updated_items = BTreeMap::new();
105 let items = tx.get_items();
106 for mut item in items {
107 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
108
109 f(tx, &cat, item.id, &mut stmt)?;
110
111 item.create_sql = stmt.to_ast_string_stable();
112
113 updated_items.insert(item.id, item);
114 }
115 tx.update_items(updated_items)?;
116 Ok(())
117}
118
119pub(crate) struct MigrateResult {
120 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
121 pub(crate) catalog_updates: Vec<ParsedStateUpdate>,
122 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
123}
124
125pub(crate) async fn migrate(
129 state: &mut CatalogState,
130 tx: &mut Transaction<'_>,
131 local_expr_cache: &mut LocalExpressionCache,
132 item_updates: Vec<StateUpdate>,
133 _now: NowFn,
134 _boot_ts: Timestamp,
135) -> Result<MigrateResult, anyhow::Error> {
136 let catalog_version = get_migration_version(tx).unwrap_or(Version::new(0, 0, 0));
137
138 info!(
139 "migrating statements from catalog version {:?}",
140 catalog_version
141 );
142
143 rewrite_ast_items(tx, |tx, _id, stmt| {
144 ast_rewrite_create_sink_partition_strategy(stmt)?;
153 ast_rewrite_sql_server_constraints(stmt)?;
154 ast_rewrite_add_missing_index_ids(tx, stmt)?;
155 Ok(())
156 })?;
157
158 let commit_ts = tx.upper();
161 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
162 let op_item_updates = tx.get_and_commit_op_updates();
163 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
164 item_updates.extend(op_item_updates);
165 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
166
167 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
171 .into_iter()
172 .partition(|(kind, _, _)| {
175 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
176 });
177
178 let item_updates = item_updates
179 .into_iter()
180 .map(|(kind, ts, diff)| StateUpdate {
181 kind: kind.into(),
182 ts,
183 diff: diff.try_into().expect("valid diff"),
184 })
185 .collect();
186
187 let force_source_table_syntax = state.system_config().force_source_table_syntax();
188 if force_source_table_syntax {
192 state
193 .system_config_mut()
194 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
195 .expect("known parameter");
196 }
197
198 let (mut ast_builtin_table_updates, mut ast_catalog_updates) =
199 state.apply_updates(item_updates, local_expr_cache).await;
200
201 info!("migrating from catalog version {:?}", catalog_version);
202
203 let conn_cat = state.for_system_session();
204
205 if force_source_table_syntax {
208 rewrite_sources_to_tables(tx, &conn_cat)?;
209 }
210
211 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
212 let _catalog_version = catalog_version.clone();
213 Ok(())
228 })?;
229
230 if force_source_table_syntax {
231 state
232 .system_config_mut()
233 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
234 .expect("known parameter");
235 }
236
237 let op_item_updates = tx.get_and_commit_op_updates();
243 let (item_builtin_table_updates, item_catalog_updates) =
244 state.apply_updates(op_item_updates, local_expr_cache).await;
245
246 ast_builtin_table_updates.extend(item_builtin_table_updates);
247 ast_catalog_updates.extend(item_catalog_updates);
248
249 info!(
250 "migration from catalog version {:?} complete",
251 catalog_version
252 );
253
254 Ok(MigrateResult {
255 builtin_table_updates: ast_builtin_table_updates,
256 catalog_updates: ast_catalog_updates,
257 post_item_updates,
258 })
259}
260
261fn rewrite_sources_to_tables(
340 tx: &mut Transaction<'_>,
341 catalog: &ConnCatalog<'_>,
342) -> Result<(), anyhow::Error> {
343 use mz_sql::ast::{
344 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
345 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
346 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
347 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
348 UnresolvedItemName, Value, WithOptionValue,
349 };
350
351 let mut updated_items = BTreeMap::new();
352
353 let mut sources = vec![];
354 let mut subsources = vec![];
355
356 for item in tx.get_items() {
357 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
358 match stmt {
359 Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
360 Statement::CreateSource(stmt) => sources.push((item, stmt)),
361 _ => {}
362 }
363 }
364
365 let mut pending_progress_items = BTreeMap::new();
366 let mut migrated_source_ids = BTreeMap::new();
367 for (mut source_item, source_stmt) in sources {
370 let CreateSourceStatement {
371 name,
372 in_cluster,
373 col_names,
374 mut connection,
375 include_metadata,
376 format,
377 envelope,
378 if_not_exists,
379 key_constraint,
380 with_options,
381 external_references,
382 progress_subsource,
383 } = source_stmt;
384
385 let (progress_name, progress_item) = match progress_subsource {
386 Some(DeferredItemName::Named(RawItemName::Name(name))) => {
387 let partial_name = normalize::unresolved_item_name(name.clone())?;
388 (name, catalog.resolve_item(&partial_name)?)
389 }
390 Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
391 let gid = id.parse()?;
392 (name, catalog.get_item(&gid))
393 }
394 Some(DeferredItemName::Deferred(_)) => {
395 unreachable!("invalid progress subsource")
396 }
397 None => {
398 info!("migrate: skipping already migrated source: {name}");
399 continue;
400 }
401 };
402 let raw_progress_name =
403 RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
404
405 let catalog_item = catalog.get_item(&source_item.id);
407 let source_name: &QualifiedItemName = catalog_item.name();
408 let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
409 let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
410
411 match &mut connection {
413 CreateSourceConnection::Postgres { options, .. } => {
414 options.retain(|o| match o.name {
415 PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
416 PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
417 });
418 }
419 CreateSourceConnection::SqlServer { options, .. } => {
420 options.retain(|o| match o.name {
421 SqlServerConfigOptionName::Details => true,
422 SqlServerConfigOptionName::TextColumns
423 | SqlServerConfigOptionName::ExcludeColumns => false,
424 });
425 }
426 CreateSourceConnection::MySql { options, .. } => {
427 options.retain(|o| match o.name {
428 MySqlConfigOptionName::Details => true,
429 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
430 false
431 }
432 });
433 }
434 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
435 }
436 }
437
438 let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
440 match connection {
441 connection @ (CreateSourceConnection::Postgres { .. }
442 | CreateSourceConnection::MySql { .. }
443 | CreateSourceConnection::SqlServer { .. }
444 | CreateSourceConnection::LoadGenerator {
445 generator:
446 LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
447 ..
448 }) => {
449 assert_eq!(col_names, &[]);
451 assert_eq!(key_constraint, None);
452 assert_eq!(format, None);
453 assert_eq!(envelope, None);
454 assert_eq!(include_metadata, &[]);
455 assert_eq!(external_references, None);
456
457 let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
465 if_exists: IfExistsBehavior::Error,
466 temporary: false,
467 definition: ViewDefinition {
468 name: progress_name,
469 columns: vec![],
470 query: Query {
471 ctes: CteBlock::Simple(vec![]),
472 body: SetExpr::Table(RawItemName::Id(
473 progress_item.id().to_string(),
474 source_name.clone(),
475 None,
476 )),
477 order_by: vec![],
478 limit: None,
479 offset: None,
480 },
481 },
482 });
483
484 let new_progress_stmt = CreateSourceStatement {
485 name: source_name.clone(),
486 in_cluster,
487 col_names: vec![],
488 connection,
489 include_metadata: vec![],
490 format: None,
491 envelope: None,
492 if_not_exists,
493 key_constraint: None,
494 with_options,
495 external_references: None,
496 progress_subsource: None,
497 };
498
499 migrated_source_ids.insert(source_item.id, progress_item.id());
500
501 (
502 full_source_name.item,
503 new_progress_stmt,
504 progress_item.name().item.clone(),
505 dummy_source_stmt,
506 )
507 }
508 CreateSourceConnection::Kafka {
509 options,
510 connection,
511 } => {
512 let constraints = if let Some(_key_constraint) = key_constraint {
513 vec![]
516 } else {
517 vec![]
518 };
519
520 let columns = if col_names.is_empty() {
521 TableFromSourceColumns::NotSpecified
522 } else {
523 TableFromSourceColumns::Named(col_names)
524 };
525
526 let details = SourceExportStatementDetails::Kafka {};
529 let table_with_options = vec![TableFromSourceOption {
530 name: TableFromSourceOptionName::Details,
531 value: Some(WithOptionValue::Value(Value::String(hex::encode(
532 details.into_proto().encode_to_vec(),
533 )))),
534 }];
535 let topic_option = options
537 .iter()
538 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
539 .expect("kafka sources must have a topic");
540 let topic = match &topic_option.value {
541 Some(WithOptionValue::Value(Value::String(topic))) => topic,
542 _ => unreachable!("topic must be a string"),
543 };
544 let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
545
546 let new_source_stmt =
547 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
548 name: source_name,
549 constraints,
550 columns,
551 if_not_exists,
552 source: raw_progress_name,
553 include_metadata,
554 format,
555 envelope,
556 external_reference: Some(external_reference),
557 with_options: table_with_options,
558 });
559
560 let new_progress_stmt = CreateSourceStatement {
561 name: progress_name,
562 in_cluster,
563 col_names: vec![],
564 connection: CreateSourceConnection::Kafka {
565 options,
566 connection,
567 },
568 include_metadata: vec![],
569 format: None,
570 envelope: None,
571 if_not_exists,
572 key_constraint: None,
573 with_options,
574 external_references: None,
575 progress_subsource: None,
576 };
577 (
578 progress_item.name().item.clone(),
579 new_progress_stmt,
580 full_source_name.item,
581 new_source_stmt,
582 )
583 }
584 CreateSourceConnection::LoadGenerator {
585 generator:
586 generator @ (LoadGenerator::Clock
587 | LoadGenerator::Counter
588 | LoadGenerator::Datums
589 | LoadGenerator::KeyValue),
590 options,
591 } => {
592 let constraints = if let Some(_key_constraint) = key_constraint {
593 vec![]
595 } else {
596 vec![]
597 };
598
599 let columns = if col_names.is_empty() {
600 TableFromSourceColumns::NotSpecified
601 } else {
602 TableFromSourceColumns::Named(col_names)
603 };
604
605 let details = SourceExportStatementDetails::LoadGenerator {
608 output: LoadGeneratorOutput::Default,
609 };
610 let table_with_options = vec![TableFromSourceOption {
611 name: TableFromSourceOptionName::Details,
612 value: Some(WithOptionValue::Value(Value::String(hex::encode(
613 details.into_proto().encode_to_vec(),
614 )))),
615 }];
616 let external_reference = FullItemName {
619 database: mz_sql::names::RawDatabaseSpecifier::Name(
620 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
621 .to_owned(),
622 ),
623 schema: generator.schema_name().to_string(),
624 item: generator.schema_name().to_string(),
625 };
626
627 let new_source_stmt =
628 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
629 name: source_name,
630 constraints,
631 columns,
632 if_not_exists,
633 source: raw_progress_name,
634 include_metadata,
635 format,
636 envelope,
637 external_reference: Some(external_reference.into()),
638 with_options: table_with_options,
639 });
640
641 let new_progress_stmt = CreateSourceStatement {
642 name: progress_name,
643 in_cluster,
644 col_names: vec![],
645 connection: CreateSourceConnection::LoadGenerator { generator, options },
646 include_metadata: vec![],
647 format: None,
648 envelope: None,
649 if_not_exists,
650 key_constraint: None,
651 with_options,
652 external_references: None,
653 progress_subsource: None,
654 };
655 (
656 progress_item.name().item.clone(),
657 new_progress_stmt,
658 full_source_name.item,
659 new_source_stmt,
660 )
661 }
662 };
663
664 info!(
668 "migrate: converted source {} to {}",
669 source_item.create_sql, new_source_stmt
670 );
671 source_item.name = new_source_name.clone();
672 source_item.create_sql = new_source_stmt.to_ast_string_stable();
673 updated_items.insert(source_item.id, source_item);
674 pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
675 }
676
677 for (mut item, stmt) in subsources {
678 match stmt {
679 CreateSubsourceStatement {
682 of_source: None, ..
683 } => {
684 let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
685 panic!("encountered orphan progress subsource id: {}", item.id)
686 };
687 item.name = new_name;
688 item.create_sql = new_stmt.to_ast_string_stable();
689 updated_items.insert(item.id, item);
690 }
691 CreateSubsourceStatement {
694 name,
695 columns,
696 constraints,
697 of_source: Some(raw_source_name),
698 if_not_exists,
699 mut with_options,
700 } => {
701 let new_raw_source_name = match raw_source_name {
702 RawItemName::Id(old_id, name, None) => {
703 let old_id: CatalogItemId = old_id.parse().expect("well formed");
704 let new_id = migrated_source_ids[&old_id].clone();
705 RawItemName::Id(new_id.to_string(), name, None)
706 }
707 _ => unreachable!("unexpected source name: {raw_source_name}"),
708 };
709 let external_reference = match with_options
712 .iter()
713 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
714 {
715 Some(i) => match with_options.remove(i).value {
716 Some(WithOptionValue::UnresolvedItemName(name)) => name,
717 _ => unreachable!("external reference must be an unresolved item name"),
718 },
719 None => panic!("subsource must have an external reference"),
720 };
721
722 let with_options = with_options
723 .into_iter()
724 .map(|option| {
725 match option.name {
726 CreateSubsourceOptionName::Details => TableFromSourceOption {
727 name: TableFromSourceOptionName::Details,
728 value: option.value,
731 },
732 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
733 name: TableFromSourceOptionName::TextColumns,
734 value: option.value,
735 },
736 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
737 name: TableFromSourceOptionName::ExcludeColumns,
738 value: option.value,
739 },
740 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
741 name: TableFromSourceOptionName::RetainHistory,
742 value: option.value,
743 },
744 CreateSubsourceOptionName::Progress => {
745 panic!("progress option should not exist on this subsource")
746 }
747 CreateSubsourceOptionName::ExternalReference => {
748 unreachable!("This option is handled separately above.")
749 }
750 }
751 })
752 .collect::<Vec<_>>();
753
754 let table = CreateTableFromSourceStatement {
755 name,
756 constraints,
757 columns: TableFromSourceColumns::Defined(columns),
758 if_not_exists,
759 source: new_raw_source_name,
760 external_reference: Some(external_reference),
761 with_options,
762 envelope: None,
764 include_metadata: vec![],
765 format: None,
766 };
767
768 info!(
769 "migrate: converted subsource {} to table {}",
770 item.create_sql, table
771 );
772 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
773 updated_items.insert(item.id, item);
774 }
775 }
776 }
777 assert!(
778 pending_progress_items.is_empty(),
779 "unexpected residual progress items: {pending_progress_items:?}"
780 );
781
782 tx.update_items(updated_items)?;
783
784 Ok(())
785}
786
787pub(crate) fn durable_migrate(
791 tx: &mut Transaction,
792 _organization_id: Uuid,
793 _boot_ts: Timestamp,
794) -> Result<(), anyhow::Error> {
795 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
798 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
799 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
800 if let Some(shard_id) = tx.get_expression_cache_shard() {
801 tx.mark_shards_as_finalized(btreeset! {shard_id});
802 tx.set_expression_cache_shard(ShardId::new())?;
803 }
804 tx.set_config(
805 EXPR_CACHE_MIGRATION_KEY.to_string(),
806 Some(EXPR_CACHE_MIGRATION_DONE),
807 )?;
808 }
809
810 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
813 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
814 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
815 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
816 {
817 if let Some(shard_id) = tx.get_builtin_migration_shard() {
818 tx.mark_shards_as_finalized(btreeset! {shard_id});
819 tx.set_builtin_migration_shard(ShardId::new())?;
820 }
821 tx.set_config(
822 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
823 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
824 )?;
825 }
826
827 if tx
828 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
829 .is_none()
830 {
831 let mut nonce = [0u8; 24];
832 let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
833 let nonce = BASE64_STANDARD.encode(nonce);
834 tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
835 }
836
837 Ok(())
838}
839
840fn ast_rewrite_create_sink_partition_strategy(
852 stmt: &mut Statement<Raw>,
853) -> Result<(), anyhow::Error> {
854 let Statement::CreateSink(stmt) = stmt else {
855 return Ok(());
856 };
857 stmt.with_options
858 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
859 Ok(())
860}
861
862fn ast_rewrite_sql_server_constraints(stmt: &mut Statement<Raw>) -> Result<(), anyhow::Error> {
864 use mz_sql::ast::{
865 CreateSubsourceOptionName, TableFromSourceOptionName, Value, WithOptionValue,
866 };
867 use mz_sql_server_util::desc::{SqlServerTableConstraint, SqlServerTableConstraintType};
868 use mz_storage_types::sources::ProtoSourceExportStatementDetails;
869 use mz_storage_types::sources::proto_source_export_statement_details::Kind;
870
871 let deets: Option<&mut String> = match stmt {
872 Statement::CreateSubsource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
873 if matches!(option.name, CreateSubsourceOptionName::Details)
874 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
875 {
876 Some(details)
877 } else {
878 None
879 }
880 }),
881 Statement::CreateTableFromSource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
882 if matches!(option.name, TableFromSourceOptionName::Details)
883 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
884 {
885 Some(details)
886 } else {
887 None
888 }
889 }),
890 _ => None,
891 };
892 let Some(deets) = deets else {
893 return Ok(());
894 };
895
896 let current_value = hex::decode(&mut *deets)?;
897 let current_value = ProtoSourceExportStatementDetails::decode(&*current_value)?;
898
899 if !matches!(current_value.kind, Some(Kind::SqlServer(_))) {
901 return Ok(());
902 };
903
904 let SourceExportStatementDetails::SqlServer {
905 mut table,
906 capture_instance,
907 initial_lsn,
908 } = SourceExportStatementDetails::from_proto(current_value)?
909 else {
910 unreachable!("statement details must exist for SQL Server");
911 };
912
913 if !table.constraints.is_empty() {
915 return Ok(());
916 }
917
918 let mut migrated_constraints: BTreeMap<_, Vec<_>> = BTreeMap::new();
921 for col in table.columns.iter_mut() {
922 if let Some(constraint_name) = col.primary_key_constraint.take() {
923 migrated_constraints
924 .entry(constraint_name)
925 .or_default()
926 .push(col.name.to_string());
927 }
928 }
929
930 table.constraints = migrated_constraints
931 .into_iter()
932 .map(|(constraint_name, column_names)| SqlServerTableConstraint {
933 constraint_name: constraint_name.to_string(),
934 constraint_type: SqlServerTableConstraintType::PrimaryKey,
935 column_names,
936 })
937 .collect();
938
939 let new_value = SourceExportStatementDetails::SqlServer {
940 table,
941 capture_instance,
942 initial_lsn,
943 };
944 *deets = hex::encode(new_value.into_proto().encode_to_vec());
945
946 Ok(())
947}
948
949fn ast_rewrite_add_missing_index_ids(
951 tx: &Transaction<'_>,
952 stmt: &mut Statement<Raw>,
953) -> Result<(), anyhow::Error> {
954 let Statement::CreateIndex(stmt) = stmt else {
955 return Ok(());
956 };
957
958 let unresolved_name = match stmt.on_name.clone() {
959 mz_sql::ast::RawItemName::Name(name) => name,
960 mz_sql::ast::RawItemName::Id(..) => return Ok(()),
962 };
963
964 let parts = &unresolved_name.0;
965 let (db_name, schema_name, item_name) = match parts.len() {
966 3 => (Some(&parts[0]), &parts[1], &parts[2]),
967 2 => (None, &parts[0], &parts[1]),
968 _ => panic!("invalid unresolved name: {unresolved_name:?}"),
969 };
970
971 let db_id = db_name.map(|x| {
972 let db = tx.get_databases().find(|db| db.name == x.as_str());
973 let db = db.unwrap_or_else(|| panic!("missing database: {x}"));
974 db.id
975 });
976 let schema_id = {
977 let schema = tx
978 .get_schemas()
979 .find(|s| s.name == schema_name.as_str() && s.database_id == db_id);
980 let schema = schema.unwrap_or_else(|| panic!("missing schema: {schema_name}, {db_id:?}"));
981 schema.id
982 };
983 let item_id = {
984 let item = tx
985 .get_items()
986 .find(|i| i.name == item_name.as_str() && i.schema_id == schema_id);
987 let item = item.unwrap_or_else(|| panic!("missing item: {item_name}, {schema_id:?}"));
988 item.id
989 };
990
991 stmt.on_name = mz_sql::ast::RawItemName::Id(item_id.to_string(), unresolved_name, None);
992
993 Ok(())
994}