1use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24 CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25 SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
45
46const MIGRATION_VERSION_KEY: &str = "migration_version";
56
57pub(crate) fn get_migration_version(txn: &Transaction<'_>) -> Option<Version> {
58 txn.get_setting(MIGRATION_VERSION_KEY.into())
59 .map(|s| s.parse().expect("valid migration version"))
60}
61
62pub(crate) fn set_migration_version(
63 txn: &mut Transaction<'_>,
64 version: Version,
65) -> Result<(), mz_catalog::durable::CatalogError> {
66 txn.set_setting(MIGRATION_VERSION_KEY.into(), Some(version.to_string()))
67}
68
69fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
70where
71 F: for<'a> FnMut(
72 &'a mut Transaction<'_>,
73 CatalogItemId,
74 &'a mut Statement<Raw>,
75 ) -> Result<(), anyhow::Error>,
76{
77 let mut updated_items = BTreeMap::new();
78
79 for mut item in tx.get_items() {
80 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
81 f(tx, item.id, &mut stmt)?;
82
83 item.create_sql = stmt.to_ast_string_stable();
84
85 updated_items.insert(item.id, item);
86 }
87 tx.update_items(updated_items)?;
88 Ok(())
89}
90
91fn rewrite_items<F>(
92 tx: &mut Transaction<'_>,
93 cat: &ConnCatalog<'_>,
94 mut f: F,
95) -> Result<(), anyhow::Error>
96where
97 F: for<'a> FnMut(
98 &'a mut Transaction<'_>,
99 &'a &ConnCatalog<'_>,
100 CatalogItemId,
101 &'a mut Statement<Raw>,
102 ) -> Result<(), anyhow::Error>,
103{
104 let mut updated_items = BTreeMap::new();
105 let items = tx.get_items();
106 for mut item in items {
107 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
108
109 f(tx, &cat, item.id, &mut stmt)?;
110
111 item.create_sql = stmt.to_ast_string_stable();
112
113 updated_items.insert(item.id, item);
114 }
115 tx.update_items(updated_items)?;
116 Ok(())
117}
118
119pub(crate) struct MigrateResult {
120 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
121 pub(crate) catalog_updates: Vec<ParsedStateUpdate>,
122 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
123}
124
125pub(crate) async fn migrate(
129 state: &mut CatalogState,
130 tx: &mut Transaction<'_>,
131 local_expr_cache: &mut LocalExpressionCache,
132 item_updates: Vec<StateUpdate>,
133 _now: NowFn,
134 _boot_ts: Timestamp,
135) -> Result<MigrateResult, anyhow::Error> {
136 let catalog_version = get_migration_version(tx).unwrap_or(Version::new(0, 0, 0));
137
138 info!(
139 "migrating statements from catalog version {:?}",
140 catalog_version
141 );
142
143 rewrite_ast_items(tx, |_tx, _id, stmt| {
144 ast_rewrite_create_sink_partition_strategy(stmt)?;
153 ast_rewrite_sql_server_constraints(stmt)?;
154 Ok(())
155 })?;
156
157 let commit_ts = tx.upper();
160 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
161 let op_item_updates = tx.get_and_commit_op_updates();
162 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
163 item_updates.extend(op_item_updates);
164 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
165
166 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
170 .into_iter()
171 .partition(|(kind, _, _)| {
174 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
175 });
176
177 let item_updates = item_updates
178 .into_iter()
179 .map(|(kind, ts, diff)| StateUpdate {
180 kind: kind.into(),
181 ts,
182 diff: diff.try_into().expect("valid diff"),
183 })
184 .collect();
185
186 let force_source_table_syntax = state.system_config().force_source_table_syntax();
187 if force_source_table_syntax {
191 state
192 .system_config_mut()
193 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
194 .expect("known parameter");
195 }
196
197 let (mut ast_builtin_table_updates, mut ast_catalog_updates) = state
198 .apply_updates_for_bootstrap(item_updates, local_expr_cache)
199 .await;
200
201 info!("migrating from catalog version {:?}", catalog_version);
202
203 let conn_cat = state.for_system_session();
204
205 if force_source_table_syntax {
208 rewrite_sources_to_tables(tx, &conn_cat)?;
209 }
210
211 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
212 let _catalog_version = catalog_version.clone();
213 Ok(())
228 })?;
229
230 if force_source_table_syntax {
231 state
232 .system_config_mut()
233 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
234 .expect("known parameter");
235 }
236
237 let op_item_updates = tx.get_and_commit_op_updates();
243 let (item_builtin_table_updates, item_catalog_updates) = state
244 .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
245 .await;
246
247 ast_builtin_table_updates.extend(item_builtin_table_updates);
248 ast_catalog_updates.extend(item_catalog_updates);
249
250 info!(
251 "migration from catalog version {:?} complete",
252 catalog_version
253 );
254
255 Ok(MigrateResult {
256 builtin_table_updates: ast_builtin_table_updates,
257 catalog_updates: ast_catalog_updates,
258 post_item_updates,
259 })
260}
261
262fn rewrite_sources_to_tables(
341 tx: &mut Transaction<'_>,
342 catalog: &ConnCatalog<'_>,
343) -> Result<(), anyhow::Error> {
344 use mz_sql::ast::{
345 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
346 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
347 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
348 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
349 UnresolvedItemName, Value, WithOptionValue,
350 };
351
352 let mut updated_items = BTreeMap::new();
353
354 let mut sources = vec![];
355 let mut subsources = vec![];
356
357 for item in tx.get_items() {
358 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
359 match stmt {
360 Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
361 Statement::CreateSource(stmt) => sources.push((item, stmt)),
362 _ => {}
363 }
364 }
365
366 let mut pending_progress_items = BTreeMap::new();
367 let mut migrated_source_ids = BTreeMap::new();
368 for (mut source_item, source_stmt) in sources {
371 let CreateSourceStatement {
372 name,
373 in_cluster,
374 col_names,
375 mut connection,
376 include_metadata,
377 format,
378 envelope,
379 if_not_exists,
380 key_constraint,
381 with_options,
382 external_references,
383 progress_subsource,
384 } = source_stmt;
385
386 let (progress_name, progress_item) = match progress_subsource {
387 Some(DeferredItemName::Named(RawItemName::Name(name))) => {
388 let partial_name = normalize::unresolved_item_name(name.clone())?;
389 (name, catalog.resolve_item(&partial_name)?)
390 }
391 Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
392 let gid = id.parse()?;
393 (name, catalog.get_item(&gid))
394 }
395 Some(DeferredItemName::Deferred(_)) => {
396 unreachable!("invalid progress subsource")
397 }
398 None => {
399 info!("migrate: skipping already migrated source: {name}");
400 continue;
401 }
402 };
403 let raw_progress_name =
404 RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
405
406 let catalog_item = catalog.get_item(&source_item.id);
408 let source_name: &QualifiedItemName = catalog_item.name();
409 let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
410 let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
411
412 match &mut connection {
414 CreateSourceConnection::Postgres { options, .. } => {
415 options.retain(|o| match o.name {
416 PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
417 PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
418 });
419 }
420 CreateSourceConnection::SqlServer { options, .. } => {
421 options.retain(|o| match o.name {
422 SqlServerConfigOptionName::Details => true,
423 SqlServerConfigOptionName::TextColumns
424 | SqlServerConfigOptionName::ExcludeColumns => false,
425 });
426 }
427 CreateSourceConnection::MySql { options, .. } => {
428 options.retain(|o| match o.name {
429 MySqlConfigOptionName::Details => true,
430 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
431 false
432 }
433 });
434 }
435 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
436 }
437 }
438
439 let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
441 match connection {
442 connection @ (CreateSourceConnection::Postgres { .. }
443 | CreateSourceConnection::MySql { .. }
444 | CreateSourceConnection::SqlServer { .. }
445 | CreateSourceConnection::LoadGenerator {
446 generator:
447 LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
448 ..
449 }) => {
450 assert_eq!(col_names, &[]);
452 assert_eq!(key_constraint, None);
453 assert_eq!(format, None);
454 assert_eq!(envelope, None);
455 assert_eq!(include_metadata, &[]);
456 assert_eq!(external_references, None);
457
458 let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
466 if_exists: IfExistsBehavior::Error,
467 temporary: false,
468 definition: ViewDefinition {
469 name: progress_name,
470 columns: vec![],
471 query: Query {
472 ctes: CteBlock::Simple(vec![]),
473 body: SetExpr::Table(RawItemName::Id(
474 progress_item.id().to_string(),
475 source_name.clone(),
476 None,
477 )),
478 order_by: vec![],
479 limit: None,
480 offset: None,
481 },
482 },
483 });
484
485 let new_progress_stmt = CreateSourceStatement {
486 name: source_name.clone(),
487 in_cluster,
488 col_names: vec![],
489 connection,
490 include_metadata: vec![],
491 format: None,
492 envelope: None,
493 if_not_exists,
494 key_constraint: None,
495 with_options,
496 external_references: None,
497 progress_subsource: None,
498 };
499
500 migrated_source_ids.insert(source_item.id, progress_item.id());
501
502 (
503 full_source_name.item,
504 new_progress_stmt,
505 progress_item.name().item.clone(),
506 dummy_source_stmt,
507 )
508 }
509 CreateSourceConnection::Kafka {
510 options,
511 connection,
512 } => {
513 let constraints = if let Some(_key_constraint) = key_constraint {
514 vec![]
517 } else {
518 vec![]
519 };
520
521 let columns = if col_names.is_empty() {
522 TableFromSourceColumns::NotSpecified
523 } else {
524 TableFromSourceColumns::Named(col_names)
525 };
526
527 let details = SourceExportStatementDetails::Kafka {};
530 let table_with_options = vec![TableFromSourceOption {
531 name: TableFromSourceOptionName::Details,
532 value: Some(WithOptionValue::Value(Value::String(hex::encode(
533 details.into_proto().encode_to_vec(),
534 )))),
535 }];
536 let topic_option = options
538 .iter()
539 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
540 .expect("kafka sources must have a topic");
541 let topic = match &topic_option.value {
542 Some(WithOptionValue::Value(Value::String(topic))) => topic,
543 _ => unreachable!("topic must be a string"),
544 };
545 let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
546
547 let new_source_stmt =
548 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
549 name: source_name,
550 constraints,
551 columns,
552 if_not_exists,
553 source: raw_progress_name,
554 include_metadata,
555 format,
556 envelope,
557 external_reference: Some(external_reference),
558 with_options: table_with_options,
559 });
560
561 let new_progress_stmt = CreateSourceStatement {
562 name: progress_name,
563 in_cluster,
564 col_names: vec![],
565 connection: CreateSourceConnection::Kafka {
566 options,
567 connection,
568 },
569 include_metadata: vec![],
570 format: None,
571 envelope: None,
572 if_not_exists,
573 key_constraint: None,
574 with_options,
575 external_references: None,
576 progress_subsource: None,
577 };
578 (
579 progress_item.name().item.clone(),
580 new_progress_stmt,
581 full_source_name.item,
582 new_source_stmt,
583 )
584 }
585 CreateSourceConnection::LoadGenerator {
586 generator:
587 generator @ (LoadGenerator::Clock
588 | LoadGenerator::Counter
589 | LoadGenerator::Datums
590 | LoadGenerator::KeyValue),
591 options,
592 } => {
593 let constraints = if let Some(_key_constraint) = key_constraint {
594 vec![]
596 } else {
597 vec![]
598 };
599
600 let columns = if col_names.is_empty() {
601 TableFromSourceColumns::NotSpecified
602 } else {
603 TableFromSourceColumns::Named(col_names)
604 };
605
606 let details = SourceExportStatementDetails::LoadGenerator {
609 output: LoadGeneratorOutput::Default,
610 };
611 let table_with_options = vec![TableFromSourceOption {
612 name: TableFromSourceOptionName::Details,
613 value: Some(WithOptionValue::Value(Value::String(hex::encode(
614 details.into_proto().encode_to_vec(),
615 )))),
616 }];
617 let external_reference = FullItemName {
620 database: mz_sql::names::RawDatabaseSpecifier::Name(
621 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
622 .to_owned(),
623 ),
624 schema: generator.schema_name().to_string(),
625 item: generator.schema_name().to_string(),
626 };
627
628 let new_source_stmt =
629 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
630 name: source_name,
631 constraints,
632 columns,
633 if_not_exists,
634 source: raw_progress_name,
635 include_metadata,
636 format,
637 envelope,
638 external_reference: Some(external_reference.into()),
639 with_options: table_with_options,
640 });
641
642 let new_progress_stmt = CreateSourceStatement {
643 name: progress_name,
644 in_cluster,
645 col_names: vec![],
646 connection: CreateSourceConnection::LoadGenerator { generator, options },
647 include_metadata: vec![],
648 format: None,
649 envelope: None,
650 if_not_exists,
651 key_constraint: None,
652 with_options,
653 external_references: None,
654 progress_subsource: None,
655 };
656 (
657 progress_item.name().item.clone(),
658 new_progress_stmt,
659 full_source_name.item,
660 new_source_stmt,
661 )
662 }
663 };
664
665 info!(
669 "migrate: converted source {} to {}",
670 source_item.create_sql, new_source_stmt
671 );
672 source_item.name = new_source_name.clone();
673 source_item.create_sql = new_source_stmt.to_ast_string_stable();
674 updated_items.insert(source_item.id, source_item);
675 pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
676 }
677
678 for (mut item, stmt) in subsources {
679 match stmt {
680 CreateSubsourceStatement {
683 of_source: None, ..
684 } => {
685 let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
686 panic!("encountered orphan progress subsource id: {}", item.id)
687 };
688 item.name = new_name;
689 item.create_sql = new_stmt.to_ast_string_stable();
690 updated_items.insert(item.id, item);
691 }
692 CreateSubsourceStatement {
695 name,
696 columns,
697 constraints,
698 of_source: Some(raw_source_name),
699 if_not_exists,
700 mut with_options,
701 } => {
702 let new_raw_source_name = match raw_source_name {
703 RawItemName::Id(old_id, name, None) => {
704 let old_id: CatalogItemId = old_id.parse().expect("well formed");
705 let new_id = migrated_source_ids[&old_id].clone();
706 RawItemName::Id(new_id.to_string(), name, None)
707 }
708 _ => unreachable!("unexpected source name: {raw_source_name}"),
709 };
710 let external_reference = match with_options
713 .iter()
714 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
715 {
716 Some(i) => match with_options.remove(i).value {
717 Some(WithOptionValue::UnresolvedItemName(name)) => name,
718 _ => unreachable!("external reference must be an unresolved item name"),
719 },
720 None => panic!("subsource must have an external reference"),
721 };
722
723 let with_options = with_options
724 .into_iter()
725 .map(|option| {
726 match option.name {
727 CreateSubsourceOptionName::Details => TableFromSourceOption {
728 name: TableFromSourceOptionName::Details,
729 value: option.value,
732 },
733 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
734 name: TableFromSourceOptionName::TextColumns,
735 value: option.value,
736 },
737 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
738 name: TableFromSourceOptionName::ExcludeColumns,
739 value: option.value,
740 },
741 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
742 name: TableFromSourceOptionName::RetainHistory,
743 value: option.value,
744 },
745 CreateSubsourceOptionName::Progress => {
746 panic!("progress option should not exist on this subsource")
747 }
748 CreateSubsourceOptionName::ExternalReference => {
749 unreachable!("This option is handled separately above.")
750 }
751 }
752 })
753 .collect::<Vec<_>>();
754
755 let table = CreateTableFromSourceStatement {
756 name,
757 constraints,
758 columns: TableFromSourceColumns::Defined(columns),
759 if_not_exists,
760 source: new_raw_source_name,
761 external_reference: Some(external_reference),
762 with_options,
763 envelope: None,
765 include_metadata: vec![],
766 format: None,
767 };
768
769 info!(
770 "migrate: converted subsource {} to table {}",
771 item.create_sql, table
772 );
773 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
774 updated_items.insert(item.id, item);
775 }
776 }
777 }
778 assert!(
779 pending_progress_items.is_empty(),
780 "unexpected residual progress items: {pending_progress_items:?}"
781 );
782
783 tx.update_items(updated_items)?;
784
785 Ok(())
786}
787
788pub(crate) fn durable_migrate(
792 tx: &mut Transaction,
793 _organization_id: Uuid,
794 _boot_ts: Timestamp,
795) -> Result<(), anyhow::Error> {
796 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
799 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
800 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
801 if let Some(shard_id) = tx.get_expression_cache_shard() {
802 tx.mark_shards_as_finalized(btreeset! {shard_id});
803 tx.set_expression_cache_shard(ShardId::new())?;
804 }
805 tx.set_config(
806 EXPR_CACHE_MIGRATION_KEY.to_string(),
807 Some(EXPR_CACHE_MIGRATION_DONE),
808 )?;
809 }
810
811 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
814 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
815 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
816 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
817 {
818 if let Some(shard_id) = tx.get_builtin_migration_shard() {
819 tx.mark_shards_as_finalized(btreeset! {shard_id});
820 tx.set_builtin_migration_shard(ShardId::new())?;
821 }
822 tx.set_config(
823 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
824 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
825 )?;
826 }
827
828 if tx
829 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
830 .is_none()
831 {
832 let mut nonce = [0u8; 24];
833 let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
834 let nonce = BASE64_STANDARD.encode(nonce);
835 tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
836 }
837
838 Ok(())
839}
840
841fn ast_rewrite_create_sink_partition_strategy(
853 stmt: &mut Statement<Raw>,
854) -> Result<(), anyhow::Error> {
855 let Statement::CreateSink(stmt) = stmt else {
856 return Ok(());
857 };
858 stmt.with_options
859 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
860 Ok(())
861}
862
863fn ast_rewrite_sql_server_constraints(stmt: &mut Statement<Raw>) -> Result<(), anyhow::Error> {
865 use mz_sql::ast::{
866 CreateSubsourceOptionName, TableFromSourceOptionName, Value, WithOptionValue,
867 };
868 use mz_sql_server_util::desc::{SqlServerTableConstraint, SqlServerTableConstraintType};
869 use mz_storage_types::sources::ProtoSourceExportStatementDetails;
870 use mz_storage_types::sources::proto_source_export_statement_details::Kind;
871
872 let deets: Option<&mut String> = match stmt {
873 Statement::CreateSubsource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
874 if matches!(option.name, CreateSubsourceOptionName::Details)
875 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
876 {
877 Some(details)
878 } else {
879 None
880 }
881 }),
882 Statement::CreateTableFromSource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
883 if matches!(option.name, TableFromSourceOptionName::Details)
884 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
885 {
886 Some(details)
887 } else {
888 None
889 }
890 }),
891 _ => None,
892 };
893 let Some(deets) = deets else {
894 return Ok(());
895 };
896
897 let current_value = hex::decode(&mut *deets)?;
898 let current_value = ProtoSourceExportStatementDetails::decode(&*current_value)?;
899
900 if !matches!(current_value.kind, Some(Kind::SqlServer(_))) {
902 return Ok(());
903 };
904
905 let SourceExportStatementDetails::SqlServer {
906 mut table,
907 capture_instance,
908 initial_lsn,
909 } = SourceExportStatementDetails::from_proto(current_value)?
910 else {
911 unreachable!("statement details must exist for SQL Server");
912 };
913
914 if !table.constraints.is_empty() {
916 return Ok(());
917 }
918
919 let mut migrated_constraints: BTreeMap<_, Vec<_>> = BTreeMap::new();
922 for col in table.columns.iter_mut() {
923 if let Some(constraint_name) = col.primary_key_constraint.take() {
924 migrated_constraints
925 .entry(constraint_name)
926 .or_default()
927 .push(col.name.to_string());
928 }
929 }
930
931 table.constraints = migrated_constraints
932 .into_iter()
933 .map(|(constraint_name, column_names)| SqlServerTableConstraint {
934 constraint_name: constraint_name.to_string(),
935 constraint_type: SqlServerTableConstraintType::PrimaryKey,
936 column_names,
937 })
938 .collect();
939
940 let new_value = SourceExportStatementDetails::SqlServer {
941 table,
942 capture_instance,
943 initial_lsn,
944 };
945 *deets = hex::encode(new_value.into_proto().encode_to_vec());
946
947 Ok(())
948}