1use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24 CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25 SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
45
46const MIGRATION_VERSION_KEY: &str = "migration_version";
56
57pub(crate) fn get_migration_version(txn: &Transaction<'_>) -> Option<Version> {
58 txn.get_setting(MIGRATION_VERSION_KEY.into())
59 .map(|s| s.parse().expect("valid migration version"))
60}
61
62pub(crate) fn set_migration_version(
63 txn: &mut Transaction<'_>,
64 version: Version,
65) -> Result<(), mz_catalog::durable::CatalogError> {
66 txn.set_setting(MIGRATION_VERSION_KEY.into(), Some(version.to_string()))
67}
68
69fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
70where
71 F: for<'a> FnMut(
72 &'a mut Transaction<'_>,
73 CatalogItemId,
74 &'a mut Statement<Raw>,
75 ) -> Result<(), anyhow::Error>,
76{
77 let mut updated_items = BTreeMap::new();
78
79 for mut item in tx.get_items() {
80 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
81 f(tx, item.id, &mut stmt)?;
82
83 item.create_sql = stmt.to_ast_string_stable();
84
85 updated_items.insert(item.id, item);
86 }
87 tx.update_items(updated_items)?;
88 Ok(())
89}
90
91fn rewrite_items<F>(
92 tx: &mut Transaction<'_>,
93 cat: &ConnCatalog<'_>,
94 mut f: F,
95) -> Result<(), anyhow::Error>
96where
97 F: for<'a> FnMut(
98 &'a mut Transaction<'_>,
99 &'a &ConnCatalog<'_>,
100 CatalogItemId,
101 &'a mut Statement<Raw>,
102 ) -> Result<(), anyhow::Error>,
103{
104 let mut updated_items = BTreeMap::new();
105 let items = tx.get_items();
106 for mut item in items {
107 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
108
109 f(tx, &cat, item.id, &mut stmt)?;
110
111 item.create_sql = stmt.to_ast_string_stable();
112
113 updated_items.insert(item.id, item);
114 }
115 tx.update_items(updated_items)?;
116 Ok(())
117}
118
119pub(crate) struct MigrateResult {
120 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
121 pub(crate) catalog_updates: Vec<ParsedStateUpdate>,
122 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
123}
124
125pub(crate) async fn migrate(
129 state: &mut CatalogState,
130 tx: &mut Transaction<'_>,
131 local_expr_cache: &mut LocalExpressionCache,
132 item_updates: Vec<StateUpdate>,
133 _now: NowFn,
134 _boot_ts: Timestamp,
135) -> Result<MigrateResult, anyhow::Error> {
136 let catalog_version = get_migration_version(tx).unwrap_or(Version::new(0, 0, 0));
137
138 info!(
139 "migrating statements from catalog version {:?}",
140 catalog_version
141 );
142
143 rewrite_ast_items(tx, |_tx, _id, stmt| {
144 ast_rewrite_create_sink_partition_strategy(stmt)?;
153 ast_rewrite_sql_server_constraints(stmt)?;
154 Ok(())
155 })?;
156
157 let commit_ts = tx.upper();
160 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
161 let op_item_updates = tx.get_and_commit_op_updates();
162 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
163 item_updates.extend(op_item_updates);
164 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
165
166 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
170 .into_iter()
171 .partition(|(kind, _, _)| {
174 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
175 });
176
177 let item_updates = item_updates
178 .into_iter()
179 .map(|(kind, ts, diff)| StateUpdate {
180 kind: kind.into(),
181 ts,
182 diff: diff.try_into().expect("valid diff"),
183 })
184 .collect();
185
186 let force_source_table_syntax = state.system_config().force_source_table_syntax();
187 if force_source_table_syntax {
191 state
192 .system_config_mut()
193 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
194 .expect("known parameter");
195 }
196
197 let (mut ast_builtin_table_updates, mut ast_catalog_updates) =
198 state.apply_updates(item_updates, local_expr_cache).await;
199
200 info!("migrating from catalog version {:?}", catalog_version);
201
202 let conn_cat = state.for_system_session();
203
204 if force_source_table_syntax {
207 rewrite_sources_to_tables(tx, &conn_cat)?;
208 }
209
210 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
211 let _catalog_version = catalog_version.clone();
212 Ok(())
227 })?;
228
229 if force_source_table_syntax {
230 state
231 .system_config_mut()
232 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
233 .expect("known parameter");
234 }
235
236 let op_item_updates = tx.get_and_commit_op_updates();
242 let (item_builtin_table_updates, item_catalog_updates) =
243 state.apply_updates(op_item_updates, local_expr_cache).await;
244
245 ast_builtin_table_updates.extend(item_builtin_table_updates);
246 ast_catalog_updates.extend(item_catalog_updates);
247
248 info!(
249 "migration from catalog version {:?} complete",
250 catalog_version
251 );
252
253 Ok(MigrateResult {
254 builtin_table_updates: ast_builtin_table_updates,
255 catalog_updates: ast_catalog_updates,
256 post_item_updates,
257 })
258}
259
260fn rewrite_sources_to_tables(
339 tx: &mut Transaction<'_>,
340 catalog: &ConnCatalog<'_>,
341) -> Result<(), anyhow::Error> {
342 use mz_sql::ast::{
343 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
344 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
345 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
346 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
347 UnresolvedItemName, Value, WithOptionValue,
348 };
349
350 let mut updated_items = BTreeMap::new();
351
352 let mut sources = vec![];
353 let mut subsources = vec![];
354
355 for item in tx.get_items() {
356 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
357 match stmt {
358 Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
359 Statement::CreateSource(stmt) => sources.push((item, stmt)),
360 _ => {}
361 }
362 }
363
364 let mut pending_progress_items = BTreeMap::new();
365 let mut migrated_source_ids = BTreeMap::new();
366 for (mut source_item, source_stmt) in sources {
369 let CreateSourceStatement {
370 name,
371 in_cluster,
372 col_names,
373 mut connection,
374 include_metadata,
375 format,
376 envelope,
377 if_not_exists,
378 key_constraint,
379 with_options,
380 external_references,
381 progress_subsource,
382 } = source_stmt;
383
384 let (progress_name, progress_item) = match progress_subsource {
385 Some(DeferredItemName::Named(RawItemName::Name(name))) => {
386 let partial_name = normalize::unresolved_item_name(name.clone())?;
387 (name, catalog.resolve_item(&partial_name)?)
388 }
389 Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
390 let gid = id.parse()?;
391 (name, catalog.get_item(&gid))
392 }
393 Some(DeferredItemName::Deferred(_)) => {
394 unreachable!("invalid progress subsource")
395 }
396 None => {
397 info!("migrate: skipping already migrated source: {name}");
398 continue;
399 }
400 };
401 let raw_progress_name =
402 RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
403
404 let catalog_item = catalog.get_item(&source_item.id);
406 let source_name: &QualifiedItemName = catalog_item.name();
407 let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
408 let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
409
410 match &mut connection {
412 CreateSourceConnection::Postgres { options, .. } => {
413 options.retain(|o| match o.name {
414 PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
415 PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
416 });
417 }
418 CreateSourceConnection::SqlServer { options, .. } => {
419 options.retain(|o| match o.name {
420 SqlServerConfigOptionName::Details => true,
421 SqlServerConfigOptionName::TextColumns
422 | SqlServerConfigOptionName::ExcludeColumns => false,
423 });
424 }
425 CreateSourceConnection::MySql { options, .. } => {
426 options.retain(|o| match o.name {
427 MySqlConfigOptionName::Details => true,
428 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
429 false
430 }
431 });
432 }
433 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
434 }
435 }
436
437 let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
439 match connection {
440 connection @ (CreateSourceConnection::Postgres { .. }
441 | CreateSourceConnection::MySql { .. }
442 | CreateSourceConnection::SqlServer { .. }
443 | CreateSourceConnection::LoadGenerator {
444 generator:
445 LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
446 ..
447 }) => {
448 assert_eq!(col_names, &[]);
450 assert_eq!(key_constraint, None);
451 assert_eq!(format, None);
452 assert_eq!(envelope, None);
453 assert_eq!(include_metadata, &[]);
454 assert_eq!(external_references, None);
455
456 let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
464 if_exists: IfExistsBehavior::Error,
465 temporary: false,
466 definition: ViewDefinition {
467 name: progress_name,
468 columns: vec![],
469 query: Query {
470 ctes: CteBlock::Simple(vec![]),
471 body: SetExpr::Table(RawItemName::Id(
472 progress_item.id().to_string(),
473 source_name.clone(),
474 None,
475 )),
476 order_by: vec![],
477 limit: None,
478 offset: None,
479 },
480 },
481 });
482
483 let new_progress_stmt = CreateSourceStatement {
484 name: source_name.clone(),
485 in_cluster,
486 col_names: vec![],
487 connection,
488 include_metadata: vec![],
489 format: None,
490 envelope: None,
491 if_not_exists,
492 key_constraint: None,
493 with_options,
494 external_references: None,
495 progress_subsource: None,
496 };
497
498 migrated_source_ids.insert(source_item.id, progress_item.id());
499
500 (
501 full_source_name.item,
502 new_progress_stmt,
503 progress_item.name().item.clone(),
504 dummy_source_stmt,
505 )
506 }
507 CreateSourceConnection::Kafka {
508 options,
509 connection,
510 } => {
511 let constraints = if let Some(_key_constraint) = key_constraint {
512 vec![]
515 } else {
516 vec![]
517 };
518
519 let columns = if col_names.is_empty() {
520 TableFromSourceColumns::NotSpecified
521 } else {
522 TableFromSourceColumns::Named(col_names)
523 };
524
525 let details = SourceExportStatementDetails::Kafka {};
528 let table_with_options = vec![TableFromSourceOption {
529 name: TableFromSourceOptionName::Details,
530 value: Some(WithOptionValue::Value(Value::String(hex::encode(
531 details.into_proto().encode_to_vec(),
532 )))),
533 }];
534 let topic_option = options
536 .iter()
537 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
538 .expect("kafka sources must have a topic");
539 let topic = match &topic_option.value {
540 Some(WithOptionValue::Value(Value::String(topic))) => topic,
541 _ => unreachable!("topic must be a string"),
542 };
543 let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
544
545 let new_source_stmt =
546 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
547 name: source_name,
548 constraints,
549 columns,
550 if_not_exists,
551 source: raw_progress_name,
552 include_metadata,
553 format,
554 envelope,
555 external_reference: Some(external_reference),
556 with_options: table_with_options,
557 });
558
559 let new_progress_stmt = CreateSourceStatement {
560 name: progress_name,
561 in_cluster,
562 col_names: vec![],
563 connection: CreateSourceConnection::Kafka {
564 options,
565 connection,
566 },
567 include_metadata: vec![],
568 format: None,
569 envelope: None,
570 if_not_exists,
571 key_constraint: None,
572 with_options,
573 external_references: None,
574 progress_subsource: None,
575 };
576 (
577 progress_item.name().item.clone(),
578 new_progress_stmt,
579 full_source_name.item,
580 new_source_stmt,
581 )
582 }
583 CreateSourceConnection::LoadGenerator {
584 generator:
585 generator @ (LoadGenerator::Clock
586 | LoadGenerator::Counter
587 | LoadGenerator::Datums
588 | LoadGenerator::KeyValue),
589 options,
590 } => {
591 let constraints = if let Some(_key_constraint) = key_constraint {
592 vec![]
594 } else {
595 vec![]
596 };
597
598 let columns = if col_names.is_empty() {
599 TableFromSourceColumns::NotSpecified
600 } else {
601 TableFromSourceColumns::Named(col_names)
602 };
603
604 let details = SourceExportStatementDetails::LoadGenerator {
607 output: LoadGeneratorOutput::Default,
608 };
609 let table_with_options = vec![TableFromSourceOption {
610 name: TableFromSourceOptionName::Details,
611 value: Some(WithOptionValue::Value(Value::String(hex::encode(
612 details.into_proto().encode_to_vec(),
613 )))),
614 }];
615 let external_reference = FullItemName {
618 database: mz_sql::names::RawDatabaseSpecifier::Name(
619 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
620 .to_owned(),
621 ),
622 schema: generator.schema_name().to_string(),
623 item: generator.schema_name().to_string(),
624 };
625
626 let new_source_stmt =
627 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
628 name: source_name,
629 constraints,
630 columns,
631 if_not_exists,
632 source: raw_progress_name,
633 include_metadata,
634 format,
635 envelope,
636 external_reference: Some(external_reference.into()),
637 with_options: table_with_options,
638 });
639
640 let new_progress_stmt = CreateSourceStatement {
641 name: progress_name,
642 in_cluster,
643 col_names: vec![],
644 connection: CreateSourceConnection::LoadGenerator { generator, options },
645 include_metadata: vec![],
646 format: None,
647 envelope: None,
648 if_not_exists,
649 key_constraint: None,
650 with_options,
651 external_references: None,
652 progress_subsource: None,
653 };
654 (
655 progress_item.name().item.clone(),
656 new_progress_stmt,
657 full_source_name.item,
658 new_source_stmt,
659 )
660 }
661 };
662
663 info!(
667 "migrate: converted source {} to {}",
668 source_item.create_sql, new_source_stmt
669 );
670 source_item.name = new_source_name.clone();
671 source_item.create_sql = new_source_stmt.to_ast_string_stable();
672 updated_items.insert(source_item.id, source_item);
673 pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
674 }
675
676 for (mut item, stmt) in subsources {
677 match stmt {
678 CreateSubsourceStatement {
681 of_source: None, ..
682 } => {
683 let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
684 panic!("encountered orphan progress subsource id: {}", item.id)
685 };
686 item.name = new_name;
687 item.create_sql = new_stmt.to_ast_string_stable();
688 updated_items.insert(item.id, item);
689 }
690 CreateSubsourceStatement {
693 name,
694 columns,
695 constraints,
696 of_source: Some(raw_source_name),
697 if_not_exists,
698 mut with_options,
699 } => {
700 let new_raw_source_name = match raw_source_name {
701 RawItemName::Id(old_id, name, None) => {
702 let old_id: CatalogItemId = old_id.parse().expect("well formed");
703 let new_id = migrated_source_ids[&old_id].clone();
704 RawItemName::Id(new_id.to_string(), name, None)
705 }
706 _ => unreachable!("unexpected source name: {raw_source_name}"),
707 };
708 let external_reference = match with_options
711 .iter()
712 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
713 {
714 Some(i) => match with_options.remove(i).value {
715 Some(WithOptionValue::UnresolvedItemName(name)) => name,
716 _ => unreachable!("external reference must be an unresolved item name"),
717 },
718 None => panic!("subsource must have an external reference"),
719 };
720
721 let with_options = with_options
722 .into_iter()
723 .map(|option| {
724 match option.name {
725 CreateSubsourceOptionName::Details => TableFromSourceOption {
726 name: TableFromSourceOptionName::Details,
727 value: option.value,
730 },
731 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
732 name: TableFromSourceOptionName::TextColumns,
733 value: option.value,
734 },
735 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
736 name: TableFromSourceOptionName::ExcludeColumns,
737 value: option.value,
738 },
739 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
740 name: TableFromSourceOptionName::RetainHistory,
741 value: option.value,
742 },
743 CreateSubsourceOptionName::Progress => {
744 panic!("progress option should not exist on this subsource")
745 }
746 CreateSubsourceOptionName::ExternalReference => {
747 unreachable!("This option is handled separately above.")
748 }
749 }
750 })
751 .collect::<Vec<_>>();
752
753 let table = CreateTableFromSourceStatement {
754 name,
755 constraints,
756 columns: TableFromSourceColumns::Defined(columns),
757 if_not_exists,
758 source: new_raw_source_name,
759 external_reference: Some(external_reference),
760 with_options,
761 envelope: None,
763 include_metadata: vec![],
764 format: None,
765 };
766
767 info!(
768 "migrate: converted subsource {} to table {}",
769 item.create_sql, table
770 );
771 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
772 updated_items.insert(item.id, item);
773 }
774 }
775 }
776 assert!(
777 pending_progress_items.is_empty(),
778 "unexpected residual progress items: {pending_progress_items:?}"
779 );
780
781 tx.update_items(updated_items)?;
782
783 Ok(())
784}
785
786pub(crate) fn durable_migrate(
790 tx: &mut Transaction,
791 _organization_id: Uuid,
792 _boot_ts: Timestamp,
793) -> Result<(), anyhow::Error> {
794 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
797 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
798 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
799 if let Some(shard_id) = tx.get_expression_cache_shard() {
800 tx.mark_shards_as_finalized(btreeset! {shard_id});
801 tx.set_expression_cache_shard(ShardId::new())?;
802 }
803 tx.set_config(
804 EXPR_CACHE_MIGRATION_KEY.to_string(),
805 Some(EXPR_CACHE_MIGRATION_DONE),
806 )?;
807 }
808
809 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
812 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
813 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
814 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
815 {
816 if let Some(shard_id) = tx.get_builtin_migration_shard() {
817 tx.mark_shards_as_finalized(btreeset! {shard_id});
818 tx.set_builtin_migration_shard(ShardId::new())?;
819 }
820 tx.set_config(
821 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
822 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
823 )?;
824 }
825
826 if tx
827 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
828 .is_none()
829 {
830 let mut nonce = [0u8; 24];
831 let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
832 let nonce = BASE64_STANDARD.encode(nonce);
833 tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
834 }
835
836 Ok(())
837}
838
839fn ast_rewrite_create_sink_partition_strategy(
851 stmt: &mut Statement<Raw>,
852) -> Result<(), anyhow::Error> {
853 let Statement::CreateSink(stmt) = stmt else {
854 return Ok(());
855 };
856 stmt.with_options
857 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
858 Ok(())
859}
860
861fn ast_rewrite_sql_server_constraints(stmt: &mut Statement<Raw>) -> Result<(), anyhow::Error> {
863 use mz_sql::ast::{
864 CreateSubsourceOptionName, TableFromSourceOptionName, Value, WithOptionValue,
865 };
866 use mz_sql_server_util::desc::{SqlServerTableConstraint, SqlServerTableConstraintType};
867 use mz_storage_types::sources::ProtoSourceExportStatementDetails;
868 use mz_storage_types::sources::proto_source_export_statement_details::Kind;
869
870 let deets: Option<&mut String> = match stmt {
871 Statement::CreateSubsource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
872 if matches!(option.name, CreateSubsourceOptionName::Details)
873 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
874 {
875 Some(details)
876 } else {
877 None
878 }
879 }),
880 Statement::CreateTableFromSource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
881 if matches!(option.name, TableFromSourceOptionName::Details)
882 && let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
883 {
884 Some(details)
885 } else {
886 None
887 }
888 }),
889 _ => None,
890 };
891 let Some(deets) = deets else {
892 return Ok(());
893 };
894
895 let current_value = hex::decode(&mut *deets)?;
896 let current_value = ProtoSourceExportStatementDetails::decode(&*current_value)?;
897
898 if !matches!(current_value.kind, Some(Kind::SqlServer(_))) {
900 return Ok(());
901 };
902
903 let SourceExportStatementDetails::SqlServer {
904 mut table,
905 capture_instance,
906 initial_lsn,
907 } = SourceExportStatementDetails::from_proto(current_value)?
908 else {
909 unreachable!("statement details must exist for SQL Server");
910 };
911
912 if !table.constraints.is_empty() {
914 return Ok(());
915 }
916
917 let mut migrated_constraints: BTreeMap<_, Vec<_>> = BTreeMap::new();
920 for col in table.columns.iter_mut() {
921 if let Some(constraint_name) = col.primary_key_constraint.take() {
922 migrated_constraints
923 .entry(constraint_name)
924 .or_default()
925 .push(col.name.to_string());
926 }
927 }
928
929 table.constraints = migrated_constraints
930 .into_iter()
931 .map(|(constraint_name, column_names)| SqlServerTableConstraint {
932 constraint_name: constraint_name.to_string(),
933 constraint_type: SqlServerTableConstraintType::PrimaryKey,
934 column_names,
935 })
936 .collect();
937
938 let new_value = SourceExportStatementDetails::SqlServer {
939 table,
940 capture_instance,
941 initial_lsn,
942 };
943 *deets = hex::encode(new_value.into_proto().encode_to_vec());
944
945 Ok(())
946}