1use std::collections::BTreeMap;
11
12use base64::prelude::*;
13use maplit::btreeset;
14use mz_catalog::builtin::BuiltinTable;
15use mz_catalog::durable::{MOCK_AUTHENTICATION_NONCE_KEY, Transaction};
16use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
17use mz_ore::collections::CollectionExt;
18use mz_ore::now::NowFn;
19use mz_persist_types::ShardId;
20use mz_proto::RustType;
21use mz_repr::{CatalogItemId, Diff, Timestamp};
22use mz_sql::ast::display::AstDisplay;
23use mz_sql::ast::{
24 CreateSinkOptionName, CreateViewStatement, CteBlock, DeferredItemName, IfExistsBehavior, Query,
25 SetExpr, SqlServerConfigOptionName, ViewDefinition,
26};
27use mz_sql::catalog::SessionCatalog;
28use mz_sql::names::{FullItemName, QualifiedItemName};
29use mz_sql::normalize;
30use mz_sql::session::vars::{FORCE_SOURCE_TABLE_SYNTAX, Var, VarInput};
31use mz_sql_parser::ast::{Raw, Statement};
32use mz_storage_client::controller::StorageTxn;
33use mz_storage_types::sources::SourceExportStatementDetails;
34use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
35use prost::Message;
36use semver::Version;
37use tracing::info;
38use uuid::Uuid;
39
40use crate::catalog::open::into_consolidatable_updates_startup;
42use crate::catalog::state::LocalExpressionCache;
43use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44
45fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
46where
47 F: for<'a> FnMut(
48 &'a mut Transaction<'_>,
49 CatalogItemId,
50 &'a mut Statement<Raw>,
51 ) -> Result<(), anyhow::Error>,
52{
53 let mut updated_items = BTreeMap::new();
54
55 for mut item in tx.get_items() {
56 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
57 f(tx, item.id, &mut stmt)?;
58
59 item.create_sql = stmt.to_ast_string_stable();
60
61 updated_items.insert(item.id, item);
62 }
63 tx.update_items(updated_items)?;
64 Ok(())
65}
66
67fn rewrite_items<F>(
68 tx: &mut Transaction<'_>,
69 cat: &ConnCatalog<'_>,
70 mut f: F,
71) -> Result<(), anyhow::Error>
72where
73 F: for<'a> FnMut(
74 &'a mut Transaction<'_>,
75 &'a &ConnCatalog<'_>,
76 CatalogItemId,
77 &'a mut Statement<Raw>,
78 ) -> Result<(), anyhow::Error>,
79{
80 let mut updated_items = BTreeMap::new();
81 let items = tx.get_items();
82 for mut item in items {
83 let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
84
85 f(tx, &cat, item.id, &mut stmt)?;
86
87 item.create_sql = stmt.to_ast_string_stable();
88
89 updated_items.insert(item.id, item);
90 }
91 tx.update_items(updated_items)?;
92 Ok(())
93}
94
95pub(crate) struct MigrateResult {
96 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
97 pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
98}
99
100pub(crate) async fn migrate(
104 state: &mut CatalogState,
105 tx: &mut Transaction<'_>,
106 local_expr_cache: &mut LocalExpressionCache,
107 item_updates: Vec<StateUpdate>,
108 _now: NowFn,
109 _boot_ts: Timestamp,
110) -> Result<MigrateResult, anyhow::Error> {
111 let catalog_version = tx.get_catalog_content_version();
112 let catalog_version = match catalog_version {
113 Some(v) => Version::parse(v)?,
114 None => Version::new(0, 0, 0),
115 };
116
117 info!(
118 "migrating statements from catalog version {:?}",
119 catalog_version
120 );
121
122 rewrite_ast_items(tx, |_tx, _id, stmt| {
123 ast_rewrite_create_sink_partition_strategy(stmt)?;
132 Ok(())
133 })?;
134
135 let commit_ts = tx.upper();
138 let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
139 let op_item_updates = tx.get_and_commit_op_updates();
140 let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
141 item_updates.extend(op_item_updates);
142 differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
143
144 let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
148 .into_iter()
149 .partition(|(kind, _, _)| {
152 matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
153 });
154
155 let item_updates = item_updates
156 .into_iter()
157 .map(|(kind, ts, diff)| StateUpdate {
158 kind: kind.into(),
159 ts,
160 diff: diff.try_into().expect("valid diff"),
161 })
162 .collect();
163
164 let force_source_table_syntax = state.system_config().force_source_table_syntax();
165 if force_source_table_syntax {
169 state
170 .system_config_mut()
171 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("off"))
172 .expect("known parameter");
173 }
174
175 let mut ast_builtin_table_updates = state
176 .apply_updates_for_bootstrap(item_updates, local_expr_cache)
177 .await;
178
179 info!("migrating from catalog version {:?}", catalog_version);
180
181 let conn_cat = state.for_system_session();
182
183 if force_source_table_syntax {
186 rewrite_sources_to_tables(tx, &conn_cat)?;
187 }
188
189 rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
190 let _catalog_version = catalog_version.clone();
191 Ok(())
206 })?;
207
208 if force_source_table_syntax {
209 state
210 .system_config_mut()
211 .set(FORCE_SOURCE_TABLE_SYNTAX.name(), VarInput::Flat("on"))
212 .expect("known parameter");
213 }
214
215 let op_item_updates = tx.get_and_commit_op_updates();
221 let item_builtin_table_updates = state
222 .apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
223 .await;
224
225 ast_builtin_table_updates.extend(item_builtin_table_updates);
226
227 info!(
228 "migration from catalog version {:?} complete",
229 catalog_version
230 );
231 Ok(MigrateResult {
232 builtin_table_updates: ast_builtin_table_updates,
233 post_item_updates,
234 })
235}
236
237fn rewrite_sources_to_tables(
316 tx: &mut Transaction<'_>,
317 catalog: &ConnCatalog<'_>,
318) -> Result<(), anyhow::Error> {
319 use mz_sql::ast::{
320 CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
321 CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
322 KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
323 RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
324 UnresolvedItemName, Value, WithOptionValue,
325 };
326
327 let mut updated_items = BTreeMap::new();
328
329 let mut sources = vec![];
330 let mut subsources = vec![];
331
332 for item in tx.get_items() {
333 let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
334 match stmt {
335 Statement::CreateSubsource(stmt) => subsources.push((item, stmt)),
336 Statement::CreateSource(stmt) => sources.push((item, stmt)),
337 _ => {}
338 }
339 }
340
341 let mut pending_progress_items = BTreeMap::new();
342 let mut migrated_source_ids = BTreeMap::new();
343 for (mut source_item, source_stmt) in sources {
346 let CreateSourceStatement {
347 name,
348 in_cluster,
349 col_names,
350 mut connection,
351 include_metadata,
352 format,
353 envelope,
354 if_not_exists,
355 key_constraint,
356 with_options,
357 external_references,
358 progress_subsource,
359 } = source_stmt;
360
361 let (progress_name, progress_item) = match progress_subsource {
362 Some(DeferredItemName::Named(RawItemName::Name(name))) => {
363 let partial_name = normalize::unresolved_item_name(name.clone())?;
364 (name, catalog.resolve_item(&partial_name)?)
365 }
366 Some(DeferredItemName::Named(RawItemName::Id(id, name, _))) => {
367 let gid = id.parse()?;
368 (name, catalog.get_item(&gid))
369 }
370 Some(DeferredItemName::Deferred(_)) => {
371 unreachable!("invalid progress subsource")
372 }
373 None => {
374 info!("migrate: skipping already migrated source: {name}");
375 continue;
376 }
377 };
378 let raw_progress_name =
379 RawItemName::Id(progress_item.id().to_string(), progress_name.clone(), None);
380
381 let catalog_item = catalog.get_item(&source_item.id);
383 let source_name: &QualifiedItemName = catalog_item.name();
384 let full_source_name: FullItemName = catalog.resolve_full_name(source_name);
385 let source_name: UnresolvedItemName = normalize::unresolve(full_source_name.clone());
386
387 match &mut connection {
389 CreateSourceConnection::Postgres { options, .. } => {
390 options.retain(|o| match o.name {
391 PgConfigOptionName::Details | PgConfigOptionName::Publication => true,
392 PgConfigOptionName::TextColumns | PgConfigOptionName::ExcludeColumns => false,
393 });
394 }
395 CreateSourceConnection::SqlServer { options, .. } => {
396 options.retain(|o| match o.name {
397 SqlServerConfigOptionName::Details => true,
398 SqlServerConfigOptionName::TextColumns
399 | SqlServerConfigOptionName::ExcludeColumns => false,
400 });
401 }
402 CreateSourceConnection::MySql { options, .. } => {
403 options.retain(|o| match o.name {
404 MySqlConfigOptionName::Details => true,
405 MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns => {
406 false
407 }
408 });
409 }
410 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. } => {
411 }
412 }
413
414 let (new_progress_name, new_progress_stmt, new_source_name, new_source_stmt) =
416 match connection {
417 connection @ (CreateSourceConnection::Postgres { .. }
418 | CreateSourceConnection::MySql { .. }
419 | CreateSourceConnection::SqlServer { .. }
420 | CreateSourceConnection::LoadGenerator {
421 generator:
422 LoadGenerator::Tpch | LoadGenerator::Auction | LoadGenerator::Marketing,
423 ..
424 }) => {
425 assert_eq!(col_names, &[]);
427 assert_eq!(key_constraint, None);
428 assert_eq!(format, None);
429 assert_eq!(envelope, None);
430 assert_eq!(include_metadata, &[]);
431 assert_eq!(external_references, None);
432
433 let dummy_source_stmt = Statement::CreateView(CreateViewStatement {
441 if_exists: IfExistsBehavior::Error,
442 temporary: false,
443 definition: ViewDefinition {
444 name: progress_name,
445 columns: vec![],
446 query: Query {
447 ctes: CteBlock::Simple(vec![]),
448 body: SetExpr::Table(RawItemName::Id(
449 progress_item.id().to_string(),
450 source_name.clone(),
451 None,
452 )),
453 order_by: vec![],
454 limit: None,
455 offset: None,
456 },
457 },
458 });
459
460 let new_progress_stmt = CreateSourceStatement {
461 name: source_name.clone(),
462 in_cluster,
463 col_names: vec![],
464 connection,
465 include_metadata: vec![],
466 format: None,
467 envelope: None,
468 if_not_exists,
469 key_constraint: None,
470 with_options,
471 external_references: None,
472 progress_subsource: None,
473 };
474
475 migrated_source_ids.insert(source_item.id, progress_item.id());
476
477 (
478 full_source_name.item,
479 new_progress_stmt,
480 progress_item.name().item.clone(),
481 dummy_source_stmt,
482 )
483 }
484 CreateSourceConnection::Kafka {
485 options,
486 connection,
487 } => {
488 let constraints = if let Some(_key_constraint) = key_constraint {
489 vec![]
492 } else {
493 vec![]
494 };
495
496 let columns = if col_names.is_empty() {
497 TableFromSourceColumns::NotSpecified
498 } else {
499 TableFromSourceColumns::Named(col_names)
500 };
501
502 let details = SourceExportStatementDetails::Kafka {};
505 let table_with_options = vec![TableFromSourceOption {
506 name: TableFromSourceOptionName::Details,
507 value: Some(WithOptionValue::Value(Value::String(hex::encode(
508 details.into_proto().encode_to_vec(),
509 )))),
510 }];
511 let topic_option = options
513 .iter()
514 .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
515 .expect("kafka sources must have a topic");
516 let topic = match &topic_option.value {
517 Some(WithOptionValue::Value(Value::String(topic))) => topic,
518 _ => unreachable!("topic must be a string"),
519 };
520 let external_reference = UnresolvedItemName::qualified(&[Ident::new(topic)?]);
521
522 let new_source_stmt =
523 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
524 name: source_name,
525 constraints,
526 columns,
527 if_not_exists,
528 source: raw_progress_name,
529 include_metadata,
530 format,
531 envelope,
532 external_reference: Some(external_reference),
533 with_options: table_with_options,
534 });
535
536 let new_progress_stmt = CreateSourceStatement {
537 name: progress_name,
538 in_cluster,
539 col_names: vec![],
540 connection: CreateSourceConnection::Kafka {
541 options,
542 connection,
543 },
544 include_metadata: vec![],
545 format: None,
546 envelope: None,
547 if_not_exists,
548 key_constraint: None,
549 with_options,
550 external_references: None,
551 progress_subsource: None,
552 };
553 (
554 progress_item.name().item.clone(),
555 new_progress_stmt,
556 full_source_name.item,
557 new_source_stmt,
558 )
559 }
560 CreateSourceConnection::LoadGenerator {
561 generator:
562 generator @ (LoadGenerator::Clock
563 | LoadGenerator::Counter
564 | LoadGenerator::Datums
565 | LoadGenerator::KeyValue),
566 options,
567 } => {
568 let constraints = if let Some(_key_constraint) = key_constraint {
569 vec![]
571 } else {
572 vec![]
573 };
574
575 let columns = if col_names.is_empty() {
576 TableFromSourceColumns::NotSpecified
577 } else {
578 TableFromSourceColumns::Named(col_names)
579 };
580
581 let details = SourceExportStatementDetails::LoadGenerator {
584 output: LoadGeneratorOutput::Default,
585 };
586 let table_with_options = vec![TableFromSourceOption {
587 name: TableFromSourceOptionName::Details,
588 value: Some(WithOptionValue::Value(Value::String(hex::encode(
589 details.into_proto().encode_to_vec(),
590 )))),
591 }];
592 let external_reference = FullItemName {
595 database: mz_sql::names::RawDatabaseSpecifier::Name(
596 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
597 .to_owned(),
598 ),
599 schema: generator.schema_name().to_string(),
600 item: generator.schema_name().to_string(),
601 };
602
603 let new_source_stmt =
604 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
605 name: source_name,
606 constraints,
607 columns,
608 if_not_exists,
609 source: raw_progress_name,
610 include_metadata,
611 format,
612 envelope,
613 external_reference: Some(external_reference.into()),
614 with_options: table_with_options,
615 });
616
617 let new_progress_stmt = CreateSourceStatement {
618 name: progress_name,
619 in_cluster,
620 col_names: vec![],
621 connection: CreateSourceConnection::LoadGenerator { generator, options },
622 include_metadata: vec![],
623 format: None,
624 envelope: None,
625 if_not_exists,
626 key_constraint: None,
627 with_options,
628 external_references: None,
629 progress_subsource: None,
630 };
631 (
632 progress_item.name().item.clone(),
633 new_progress_stmt,
634 full_source_name.item,
635 new_source_stmt,
636 )
637 }
638 };
639
640 info!(
644 "migrate: converted source {} to {}",
645 source_item.create_sql, new_source_stmt
646 );
647 source_item.name = new_source_name.clone();
648 source_item.create_sql = new_source_stmt.to_ast_string_stable();
649 updated_items.insert(source_item.id, source_item);
650 pending_progress_items.insert(progress_item.id(), (new_progress_name, new_progress_stmt));
651 }
652
653 for (mut item, stmt) in subsources {
654 match stmt {
655 CreateSubsourceStatement {
658 of_source: None, ..
659 } => {
660 let Some((new_name, new_stmt)) = pending_progress_items.remove(&item.id) else {
661 panic!("encountered orphan progress subsource id: {}", item.id)
662 };
663 item.name = new_name;
664 item.create_sql = new_stmt.to_ast_string_stable();
665 updated_items.insert(item.id, item);
666 }
667 CreateSubsourceStatement {
670 name,
671 columns,
672 constraints,
673 of_source: Some(raw_source_name),
674 if_not_exists,
675 mut with_options,
676 } => {
677 let new_raw_source_name = match raw_source_name {
678 RawItemName::Id(old_id, name, None) => {
679 let old_id: CatalogItemId = old_id.parse().expect("well formed");
680 let new_id = migrated_source_ids[&old_id].clone();
681 RawItemName::Id(new_id.to_string(), name, None)
682 }
683 _ => unreachable!("unexpected source name: {raw_source_name}"),
684 };
685 let external_reference = match with_options
688 .iter()
689 .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
690 {
691 Some(i) => match with_options.remove(i).value {
692 Some(WithOptionValue::UnresolvedItemName(name)) => name,
693 _ => unreachable!("external reference must be an unresolved item name"),
694 },
695 None => panic!("subsource must have an external reference"),
696 };
697
698 let with_options = with_options
699 .into_iter()
700 .map(|option| {
701 match option.name {
702 CreateSubsourceOptionName::Details => TableFromSourceOption {
703 name: TableFromSourceOptionName::Details,
704 value: option.value,
707 },
708 CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
709 name: TableFromSourceOptionName::TextColumns,
710 value: option.value,
711 },
712 CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
713 name: TableFromSourceOptionName::ExcludeColumns,
714 value: option.value,
715 },
716 CreateSubsourceOptionName::RetainHistory => TableFromSourceOption {
717 name: TableFromSourceOptionName::RetainHistory,
718 value: option.value,
719 },
720 CreateSubsourceOptionName::Progress => {
721 panic!("progress option should not exist on this subsource")
722 }
723 CreateSubsourceOptionName::ExternalReference => {
724 unreachable!("This option is handled separately above.")
725 }
726 }
727 })
728 .collect::<Vec<_>>();
729
730 let table = CreateTableFromSourceStatement {
731 name,
732 constraints,
733 columns: TableFromSourceColumns::Defined(columns),
734 if_not_exists,
735 source: new_raw_source_name,
736 external_reference: Some(external_reference),
737 with_options,
738 envelope: None,
740 include_metadata: vec![],
741 format: None,
742 };
743
744 info!(
745 "migrate: converted subsource {} to table {}",
746 item.create_sql, table
747 );
748 item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
749 updated_items.insert(item.id, item);
750 }
751 }
752 }
753 assert!(
754 pending_progress_items.is_empty(),
755 "unexpected residual progress items: {pending_progress_items:?}"
756 );
757
758 tx.update_items(updated_items)?;
759
760 Ok(())
761}
762
763pub(crate) fn durable_migrate(
767 tx: &mut Transaction,
768 _organization_id: Uuid,
769 _boot_ts: Timestamp,
770) -> Result<(), anyhow::Error> {
771 const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
774 const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
775 if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
776 if let Some(shard_id) = tx.get_expression_cache_shard() {
777 tx.mark_shards_as_finalized(btreeset! {shard_id});
778 tx.set_expression_cache_shard(ShardId::new())?;
779 }
780 tx.set_config(
781 EXPR_CACHE_MIGRATION_KEY.to_string(),
782 Some(EXPR_CACHE_MIGRATION_DONE),
783 )?;
784 }
785
786 const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
789 const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
790 if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
791 != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
792 {
793 if let Some(shard_id) = tx.get_builtin_migration_shard() {
794 tx.mark_shards_as_finalized(btreeset! {shard_id});
795 tx.set_builtin_migration_shard(ShardId::new())?;
796 }
797 tx.set_config(
798 BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
799 Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
800 )?;
801 }
802
803 if tx
804 .get_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string())
805 .is_none()
806 {
807 let mut nonce = [0u8; 24];
808 let _ = openssl::rand::rand_bytes(&mut nonce).expect("failed to generate nonce");
809 let nonce = BASE64_STANDARD.encode(nonce);
810 tx.set_setting(MOCK_AUTHENTICATION_NONCE_KEY.to_string(), Some(nonce))?;
811 }
812
813 Ok(())
814}
815
816fn ast_rewrite_create_sink_partition_strategy(
828 stmt: &mut Statement<Raw>,
829) -> Result<(), anyhow::Error> {
830 let Statement::CreateSink(stmt) = stmt else {
831 return Ok(());
832 };
833 stmt.with_options
834 .retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
835 Ok(())
836}