1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40 MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::{Opaque, SinceHandle};
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
76 vec![
77 MigrationStep::replacement(
78 "0.149.0",
79 CatalogItemType::Source,
80 MZ_INTERNAL_SCHEMA,
81 "mz_sink_statistics_raw",
82 ),
83 MigrationStep::replacement(
84 "0.149.0",
85 CatalogItemType::Source,
86 MZ_INTERNAL_SCHEMA,
87 "mz_source_statistics_raw",
88 ),
89 MigrationStep::evolution(
90 "0.159.0",
91 CatalogItemType::Source,
92 MZ_INTERNAL_SCHEMA,
93 "mz_cluster_replica_metrics_history",
94 ),
95 MigrationStep::replacement(
96 "0.160.0",
97 CatalogItemType::Table,
98 MZ_CATALOG_SCHEMA,
99 "mz_roles",
100 ),
101 MigrationStep::replacement(
102 "0.160.0",
103 CatalogItemType::Table,
104 MZ_CATALOG_SCHEMA,
105 "mz_sinks",
106 ),
107 MigrationStep::replacement(
108 "26.18.0-dev.0",
109 CatalogItemType::MaterializedView,
110 MZ_CATALOG_SCHEMA,
111 "mz_databases",
112 ),
113 MigrationStep::replacement(
114 "26.19.0-dev.0",
115 CatalogItemType::MaterializedView,
116 MZ_CATALOG_SCHEMA,
117 "mz_schemas",
118 ),
119 MigrationStep::replacement(
120 "26.19.0-dev.0",
121 CatalogItemType::MaterializedView,
122 MZ_CATALOG_SCHEMA,
123 "mz_role_members",
124 ),
125 MigrationStep::replacement(
126 "26.19.0-dev.0",
127 CatalogItemType::MaterializedView,
128 MZ_INTERNAL_SCHEMA,
129 "mz_network_policies",
130 ),
131 MigrationStep::replacement(
132 "26.19.0-dev.0",
133 CatalogItemType::MaterializedView,
134 MZ_INTERNAL_SCHEMA,
135 "mz_network_policy_rules",
136 ),
137 MigrationStep::replacement(
138 "26.19.0-dev.0",
139 CatalogItemType::MaterializedView,
140 MZ_INTERNAL_SCHEMA,
141 "mz_cluster_workload_classes",
142 ),
143 MigrationStep::replacement(
144 "26.19.0-dev.0",
145 CatalogItemType::MaterializedView,
146 MZ_INTERNAL_SCHEMA,
147 "mz_internal_cluster_replicas",
148 ),
149 MigrationStep::replacement(
150 "26.19.0-dev.0",
151 CatalogItemType::MaterializedView,
152 MZ_INTERNAL_SCHEMA,
153 "mz_pending_cluster_replicas",
154 ),
155 MigrationStep::replacement(
156 "26.20.0-dev.0",
157 CatalogItemType::MaterializedView,
158 MZ_CATALOG_SCHEMA,
159 "mz_materialized_views",
160 ),
161 MigrationStep::replacement(
162 "26.22.0-dev.0",
163 CatalogItemType::MaterializedView,
164 MZ_CATALOG_SCHEMA,
165 "mz_connections",
166 ),
167 MigrationStep::replacement(
168 "26.22.0-dev.0",
169 CatalogItemType::MaterializedView,
170 MZ_CATALOG_SCHEMA,
171 "mz_secrets",
172 ),
173 ]
174});
175
176#[derive(Clone, Debug)]
178struct MigrationStep {
179 version: Version,
181 object: SystemObjectDescription,
183 mechanism: Mechanism,
185}
186
187impl MigrationStep {
188 fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
190 Self {
191 version: Version::parse(version).expect("valid"),
192 object: SystemObjectDescription {
193 schema_name: schema.into(),
194 object_type: type_,
195 object_name: name.into(),
196 },
197 mechanism: Mechanism::Evolution,
198 }
199 }
200
201 fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
203 Self {
204 version: Version::parse(version).expect("valid"),
205 object: SystemObjectDescription {
206 schema_name: schema.into(),
207 object_type: type_,
208 object_name: name.into(),
209 },
210 mechanism: Mechanism::Replacement,
211 }
212 }
213}
214
215#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
217#[allow(dead_code)]
218enum Mechanism {
219 Evolution,
224 Replacement,
228}
229
230pub(super) struct MigrationResult {
232 pub replaced_items: BTreeSet<CatalogItemId>,
234 pub cleanup_action: BoxFuture<'static, ()>,
236}
237
238impl Default for MigrationResult {
239 fn default() -> Self {
240 Self {
241 replaced_items: Default::default(),
242 cleanup_action: async {}.boxed(),
243 }
244 }
245}
246
247pub(super) async fn run(
253 build_info: &BuildInfo,
254 deploy_generation: u64,
255 txn: &mut Transaction<'_>,
256 config: BuiltinItemMigrationConfig,
257) -> Result<MigrationResult, Error> {
258 assert_eq!(config.read_only, txn.is_savepoint());
260
261 if *build_info == DUMMY_BUILD_INFO {
264 return Ok(MigrationResult::default());
265 }
266
267 let Some(durable_version) = get_migration_version(txn) else {
268 return Ok(MigrationResult::default());
270 };
271 let build_version = build_info.semver_version();
272
273 let collection_metadata = txn.get_collection_metadata();
274 let system_objects = txn
275 .get_system_object_mappings()
276 .map(|m| {
277 let object = m.description;
278 let global_id = m.unique_identifier.global_id;
279 let shard_id = collection_metadata.get(&global_id).copied();
280 let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
281 panic!("missing builtin {object:?}");
282 };
283 let info = ObjectInfo {
284 global_id,
285 shard_id,
286 builtin,
287 fingerprint: m.unique_identifier.fingerprint,
288 };
289 (object, info)
290 })
291 .collect();
292
293 let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
294
295 let migration = Migration {
296 source_version: durable_version.clone(),
297 target_version: build_version.clone(),
298 deploy_generation,
299 system_objects,
300 migration_shard,
301 config,
302 };
303
304 let result = migration.run(&MIGRATIONS).await.map_err(|e| {
305 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
306 last_seen_version: durable_version.to_string(),
307 this_version: build_version.to_string(),
308 cause: e.to_string(),
309 })
310 })?;
311
312 result.apply(txn);
313
314 let replaced_items = txn
315 .get_system_object_mappings()
316 .map(|m| m.unique_identifier)
317 .filter(|ids| result.new_shards.contains_key(&ids.global_id))
318 .map(|ids| ids.catalog_id)
319 .collect();
320
321 Ok(MigrationResult {
322 replaced_items,
323 cleanup_action: result.cleanup_action,
324 })
325}
326
327struct MigrationRunResult {
329 new_shards: BTreeMap<GlobalId, ShardId>,
330 new_fingerprints: BTreeMap<SystemObjectDescription, String>,
331 shards_to_finalize: BTreeSet<ShardId>,
332 cleanup_action: BoxFuture<'static, ()>,
333}
334
335impl Default for MigrationRunResult {
336 fn default() -> Self {
337 Self {
338 new_shards: BTreeMap::new(),
339 new_fingerprints: BTreeMap::new(),
340 shards_to_finalize: BTreeSet::new(),
341 cleanup_action: async {}.boxed(),
342 }
343 }
344}
345
346impl MigrationRunResult {
347 fn apply(&self, txn: &mut Transaction<'_>) {
349 let replaced_ids = self.new_shards.keys().copied().collect();
351 let old_metadata = txn.delete_collection_metadata(replaced_ids);
352 txn.insert_collection_metadata(self.new_shards.clone())
353 .expect("inserting unique shards IDs after deleting existing entries");
354
355 let mut unfinalized_shards: BTreeSet<_> =
357 old_metadata.into_iter().map(|(_, sid)| sid).collect();
358 unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
359 txn.insert_unfinalized_shards(unfinalized_shards)
360 .expect("cannot fail");
361
362 let mappings = txn
364 .get_system_object_mappings()
365 .filter_map(|m| {
366 let fingerprint = self.new_fingerprints.get(&m.description)?;
367 Some(SystemObjectMapping {
368 description: m.description,
369 unique_identifier: SystemObjectUniqueIdentifier {
370 catalog_id: m.unique_identifier.catalog_id,
371 global_id: m.unique_identifier.global_id,
372 fingerprint: fingerprint.clone(),
373 },
374 })
375 })
376 .collect();
377 txn.set_system_object_mappings(mappings)
378 .expect("filtered existing mappings remain unique");
379 }
380}
381
382#[derive(Clone, Debug)]
384struct ObjectInfo {
385 global_id: GlobalId,
386 shard_id: Option<ShardId>,
387 builtin: &'static Builtin<NameReference>,
388 fingerprint: String,
389}
390
391struct Migration {
393 source_version: Version,
398 target_version: Version,
402 deploy_generation: u64,
404 system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
406 migration_shard: ShardId,
408 config: BuiltinItemMigrationConfig,
410}
411
412impl Migration {
413 async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
414 info!(
415 deploy_generation = %self.deploy_generation,
416 "running builtin schema migration: {} -> {}",
417 self.source_version, self.target_version
418 );
419
420 self.validate_migration_steps(steps);
421
422 let (force, plan) = match self.config.force_migration.as_deref() {
423 None => (false, self.plan_migration(steps)),
424 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
425 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
426 Some(other) => panic!("unknown force migration mechanism: {other}"),
427 };
428
429 if self.source_version == self.target_version && !force {
430 info!("skipping migration: already at target version");
431 return Ok(MigrationRunResult::default());
432 } else if self.source_version > self.target_version {
433 bail!("downgrade not supported");
434 }
435
436 if !self.config.read_only {
439 self.upgrade_migration_shard_version().await;
440 }
441
442 info!("executing migration plan: {plan:?}");
443
444 self.migrate_evolve(&plan.evolve).await?;
445 let new_shards = self.migrate_replace(&plan.replace).await?;
446
447 let mut migrated_objects = BTreeSet::new();
448 migrated_objects.extend(plan.evolve);
449 migrated_objects.extend(plan.replace);
450
451 let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
452
453 let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
454
455 Ok(MigrationRunResult {
456 new_shards,
457 new_fingerprints,
458 shards_to_finalize,
459 cleanup_action,
460 })
461 }
462
463 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
467 for step in steps {
468 assert!(
469 step.version <= self.target_version,
470 "migration step version greater than target version: {} > {}",
471 step.version,
472 self.target_version,
473 );
474
475 let object = &step.object;
476
477 assert_ne!(
485 &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
486 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
487 );
488
489 assert_ne!(
492 &*MZ_CATALOG_RAW_DESCRIPTION, object,
493 "mz_catalog_raw cannot be migrated"
494 );
495
496 let Some(object_info) = self.system_objects.get(object) else {
497 panic!("migration step for non-existent builtin: {object:?}");
498 };
499
500 let builtin = object_info.builtin;
501 use Builtin::*;
502 assert!(
503 matches!(
504 builtin,
505 Table(..) | Source(..) | MaterializedView(..) | ContinualTask(..)
506 ),
507 "schema migration not supported for builtin: {builtin:?}",
508 );
509 }
510 }
511
512 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
514 let steps = steps.iter().filter(|s| s.version > self.source_version);
516
517 let mut by_object = BTreeMap::new();
521 for step in steps {
522 if let Some(entry) = by_object.get_mut(&step.object) {
523 *entry = match (step.mechanism, *entry) {
524 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
525 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
526 Mechanism::Replacement
527 }
528 };
529 } else {
530 by_object.insert(step.object.clone(), step.mechanism);
531 }
532 }
533
534 let mut plan = Plan::default();
535 for (object, mechanism) in by_object {
536 match mechanism {
537 Mechanism::Evolution => plan.evolve.push(object),
538 Mechanism::Replacement => plan.replace.push(object),
539 }
540 }
541
542 plan
543 }
544
545 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
547 let objects = self
548 .system_objects
549 .iter()
550 .filter(|(_, info)| {
551 use Builtin::*;
552 match info.builtin {
553 Table(..) | MaterializedView(..) | ContinualTask(..) => true,
554 Source(source) => **source != *MZ_CATALOG_RAW,
555 Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
556 }
557 })
558 .map(|(object, _)| object.clone())
559 .collect();
560
561 let mut plan = Plan::default();
562 match mechanism {
563 Mechanism::Evolution => plan.evolve = objects,
564 Mechanism::Replacement => plan.replace = objects,
565 }
566
567 plan
568 }
569
570 async fn upgrade_migration_shard_version(&self) {
572 let persist = &self.config.persist_client;
573 let diagnostics = Diagnostics {
574 shard_name: "builtin_migration".to_string(),
575 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
576 };
577
578 persist
579 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
580 self.migration_shard,
581 diagnostics,
582 )
583 .await
584 .expect("valid usage");
585 }
586
587 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
589 for object in objects {
590 self.migrate_evolve_one(object).await?;
591 }
592 Ok(())
593 }
594
595 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
596 let persist = &self.config.persist_client;
597
598 let Some(object_info) = self.system_objects.get(object) else {
599 bail!("missing builtin {object:?}");
600 };
601 let id = object_info.global_id;
602
603 let Some(shard_id) = object_info.shard_id else {
604 if self.config.read_only {
609 bail!("missing shard ID for builtin {object:?} ({id})");
610 } else {
611 return Ok(());
612 }
613 };
614
615 let target_desc = match object_info.builtin {
616 Builtin::Table(table) => &table.desc,
617 Builtin::Source(source) => &source.desc,
618 Builtin::MaterializedView(mv) => &mv.desc,
619 Builtin::ContinualTask(ct) => &ct.desc,
620 _ => bail!("not a storage collection: {object:?}"),
621 };
622
623 let diagnostics = Diagnostics {
624 shard_name: id.to_string(),
625 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
626 };
627 let source_schema = persist
628 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
629 .await
630 .expect("valid usage");
631
632 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
633
634 if self.config.read_only {
635 if let Some((_, source_desc, _)) = &source_schema {
638 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
639 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
640 if backward_compatible(&old, &new).is_none() {
641 bail!(
642 "incompatible schema evolution for {object:?}: \
643 {source_desc:?} -> {target_desc:?}"
644 );
645 }
646 }
647
648 return Ok(());
649 }
650
651 let (mut schema_id, mut source_desc) = match source_schema {
652 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
653 None => {
654 debug!(%id, %shard_id, "no previous schema found; registering initial one");
659 let schema_id = persist
660 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
661 shard_id,
662 target_desc,
663 &UnitSchema,
664 diagnostics.clone(),
665 )
666 .await
667 .expect("valid usage");
668 if schema_id.is_some() {
669 return Ok(());
670 }
671
672 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
673 let (schema_id, source_desc, _) = persist
674 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
675 shard_id,
676 diagnostics.clone(),
677 )
678 .await
679 .expect("valid usage")
680 .expect("known to exist");
681
682 (schema_id, source_desc)
683 }
684 };
685
686 loop {
687 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
692 let result = persist
693 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
694 shard_id,
695 schema_id,
696 target_desc,
697 &UnitSchema,
698 diagnostics.clone(),
699 )
700 .await
701 .expect("valid usage");
702
703 match result {
704 CaESchema::Ok(schema_id) => {
705 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
706 break;
707 }
708 CaESchema::Incompatible => bail!(
709 "incompatible schema evolution for {object:?}: \
710 {source_desc:?} -> {target_desc:?}"
711 ),
712 CaESchema::ExpectedMismatch {
713 schema_id: new_id,
714 key,
715 val: UnitSchema,
716 } => {
717 schema_id = new_id;
718 source_desc = key;
719 }
720 }
721 }
722
723 Ok(())
724 }
725
726 async fn migrate_replace(
728 &self,
729 objects: &[SystemObjectDescription],
730 ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
731 if objects.is_empty() {
732 return Ok(Default::default());
733 }
734
735 let diagnostics = Diagnostics {
736 shard_name: "builtin_migration".to_string(),
737 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
738 };
739 let (mut persist_write, mut persist_read) =
740 self.open_migration_shard(diagnostics.clone()).await;
741
742 let mut ids_to_replace = BTreeSet::new();
743 for object in objects {
744 if let Some(info) = self.system_objects.get(object) {
745 ids_to_replace.insert(info.global_id);
746 } else {
747 bail!("missing id for builtin {object:?}");
748 }
749 }
750
751 info!(?objects, ?ids_to_replace, "migrating by replacement");
752
753 let replaced_shards = loop {
756 if let Some(shards) = self
757 .try_get_or_insert_replacement_shards(
758 &ids_to_replace,
759 &mut persist_write,
760 &mut persist_read,
761 )
762 .await?
763 {
764 break shards;
765 }
766 };
767
768 Ok(replaced_shards)
769 }
770
771 async fn try_get_or_insert_replacement_shards(
781 &self,
782 ids_to_replace: &BTreeSet<GlobalId>,
783 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
784 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
785 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
786 let upper = persist_write.fetch_recent_upper().await;
787 let write_ts = *upper.as_option().expect("migration shard not sealed");
788
789 let mut ids_to_replace = ids_to_replace.clone();
790 let mut replaced_shards = BTreeMap::new();
791
792 if let Some(read_ts) = write_ts.step_back() {
800 let pred = |key: &migration_shard::Key| {
801 key.build_version == self.target_version
802 && key.deploy_generation == Some(self.deploy_generation)
803 };
804 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
805 for (key, shard_id) in entries {
806 let id = GlobalId::System(key.global_id);
807 if ids_to_replace.remove(&id) {
808 replaced_shards.insert(id, shard_id);
809 }
810 }
811
812 debug!(
813 %read_ts, ?replaced_shards, ?ids_to_replace,
814 "found existing entries in migration shard",
815 );
816 }
817
818 if ids_to_replace.is_empty() {
819 return Ok(Some(replaced_shards));
820 }
821 }
822
823 let mut updates = Vec::new();
827 for id in ids_to_replace {
828 let shard_id = ShardId::new();
829 replaced_shards.insert(id, shard_id);
830
831 let GlobalId::System(global_id) = id else {
832 bail!("attempt to migrate a non-system collection: {id}");
833 };
834 let key = migration_shard::Key {
835 global_id,
836 build_version: self.target_version.clone(),
837 deploy_generation: Some(self.deploy_generation),
838 };
839 updates.push(((key, shard_id), write_ts, 1));
840 }
841
842 let upper = Antichain::from_elem(write_ts);
843 let new_upper = Antichain::from_elem(write_ts.step_forward());
844 debug!(%write_ts, "attempting insert into migration shard");
845 let result = persist_write
846 .compare_and_append(updates, upper, new_upper)
847 .await
848 .expect("valid usage");
849
850 match result {
851 Ok(()) => {
852 debug!(
853 %write_ts, ?replaced_shards,
854 "successfully inserted into migration shard"
855 );
856 Ok(Some(replaced_shards))
857 }
858 Err(_mismatch) => Ok(None),
859 }
860 }
861
862 async fn open_migration_shard(
864 &self,
865 diagnostics: Diagnostics,
866 ) -> (
867 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
868 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
869 ) {
870 let persist = &self.config.persist_client;
871
872 persist
873 .open(
874 self.migration_shard,
875 Arc::new(migration_shard::KeySchema),
876 Arc::new(ShardIdSchema),
877 diagnostics,
878 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
879 )
880 .await
881 .expect("valid usage")
882 }
883
884 async fn open_migration_shard_since(
886 &self,
887 diagnostics: Diagnostics,
888 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
889 self.config
890 .persist_client
891 .open_critical_since(
892 self.migration_shard,
893 PersistClient::CONTROLLER_CRITICAL_SINCE,
896 Opaque::encode(&i64::MIN),
897 diagnostics.clone(),
898 )
899 .await
900 .expect("valid usage")
901 }
902
903 fn update_fingerprints(
908 &self,
909 migrated_items: &BTreeSet<SystemObjectDescription>,
910 ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
911 let mut new_fingerprints = BTreeMap::new();
912 for (object, object_info) in &self.system_objects {
913 let id = object_info.global_id;
914 let builtin = object_info.builtin;
915
916 let fingerprint = builtin.fingerprint();
917 if fingerprint == object_info.fingerprint {
918 continue; }
920
921 let migrated = migrated_items.contains(object);
923 let ephemeral = matches!(
925 builtin,
926 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
927 );
928
929 if migrated || ephemeral {
930 new_fingerprints.insert(object.clone(), fingerprint);
931 } else if builtin.runtime_alterable() {
932 assert_eq!(
935 object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
936 "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
937 );
938 } else {
939 panic!(
940 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
941 fingerprint, object_info.fingerprint,
942 );
943 }
944 }
945
946 Ok(new_fingerprints)
947 }
948
949 async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
966 let noop_action = async {}.boxed();
967 let noop_result = (BTreeSet::new(), noop_action);
968
969 if self.config.read_only {
970 return Ok(noop_result);
971 }
972
973 let diagnostics = Diagnostics {
974 shard_name: "builtin_migration".to_string(),
975 handle_purpose: "builtin schema migration cleanup".into(),
976 };
977 let (mut persist_write, mut persist_read) =
978 self.open_migration_shard(diagnostics.clone()).await;
979 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
980
981 let upper = persist_write.fetch_recent_upper().await.clone();
982 let write_ts = *upper.as_option().expect("migration shard not sealed");
983 let Some(read_ts) = write_ts.step_back() else {
984 return Ok(noop_result);
985 };
986
987 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
989 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
990 else {
991 return Ok(noop_result);
992 };
993
994 debug!(
995 ?stale_entries,
996 "cleaning migration shard up to version {}", self.target_version,
997 );
998
999 let current_shards: BTreeMap<_, _> = self
1000 .system_objects
1001 .values()
1002 .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1003 .collect();
1004
1005 let mut shards_to_finalize = BTreeSet::new();
1006 let mut retractions = Vec::new();
1007 for (key, shard_id) in stale_entries {
1008 let gid = GlobalId::System(key.global_id);
1012 if current_shards.get(&gid) != Some(&shard_id) {
1013 shards_to_finalize.insert(shard_id);
1014 }
1015
1016 retractions.push(((key, shard_id), write_ts, -1));
1017 }
1018
1019 let cleanup_action = async move {
1020 if !retractions.is_empty() {
1021 let new_upper = Antichain::from_elem(write_ts.step_forward());
1022 let result = persist_write
1023 .compare_and_append(retractions, upper, new_upper)
1024 .await
1025 .expect("valid usage");
1026 match result {
1027 Ok(()) => debug!("cleaned up migration shard"),
1028 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1029 }
1030 }
1031 }
1032 .boxed();
1033
1034 let o = persist_since.opaque().clone();
1036 let new_since = Antichain::from_elem(read_ts);
1037 let result = persist_since
1038 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1039 .await;
1040 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1041
1042 Ok((shards_to_finalize, cleanup_action))
1043 }
1044}
1045
1046async fn read_migration_shard<P>(
1052 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1053 read_ts: Timestamp,
1054 predicate: P,
1055) -> Option<Vec<(migration_shard::Key, ShardId)>>
1056where
1057 P: for<'a> Fn(&migration_shard::Key) -> bool,
1058{
1059 let as_of = Antichain::from_elem(read_ts);
1060 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1061
1062 assert!(
1063 updates.iter().all(|(_, _, diff)| *diff == 1),
1064 "migration shard contains invalid diffs: {updates:?}",
1065 );
1066
1067 let entries: Vec<_> = updates
1068 .into_iter()
1069 .map(|(data, _, _)| data)
1070 .filter(move |(key, _)| predicate(key))
1071 .collect();
1072
1073 (!entries.is_empty()).then_some(entries)
1074}
1075
1076#[derive(Debug, Default)]
1078struct Plan {
1079 evolve: Vec<SystemObjectDescription>,
1081 replace: Vec<SystemObjectDescription>,
1083}
1084
1085mod migration_shard {
1087 use std::fmt;
1088 use std::str::FromStr;
1089
1090 use arrow::array::{StringArray, StringBuilder};
1091 use bytes::{BufMut, Bytes};
1092 use mz_persist_types::Codec;
1093 use mz_persist_types::codec_impls::{
1094 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1095 };
1096 use mz_persist_types::columnar::Schema;
1097 use mz_persist_types::stats::NoneStats;
1098 use semver::Version;
1099 use serde::{Deserialize, Serialize};
1100
1101 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1102 pub(super) struct Key {
1103 pub(super) global_id: u64,
1104 pub(super) build_version: Version,
1105 pub(super) deploy_generation: Option<u64>,
1109 }
1110
1111 impl fmt::Display for Key {
1112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1113 if self.deploy_generation.is_some() {
1114 let s = serde_json::to_string(self).expect("JSON serializable");
1116 f.write_str(&s)
1117 } else {
1118 write!(f, "{}-{}", self.global_id, self.build_version)
1120 }
1121 }
1122 }
1123
1124 impl FromStr for Key {
1125 type Err = String;
1126
1127 fn from_str(s: &str) -> Result<Self, String> {
1128 if let Ok(key) = serde_json::from_str(s) {
1130 return Ok(key);
1131 };
1132
1133 let parts: Vec<_> = s.splitn(2, '-').collect();
1135 let &[global_id, build_version] = parts.as_slice() else {
1136 return Err(format!("invalid Key '{s}'"));
1137 };
1138 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1139 let build_version = build_version
1140 .parse::<Version>()
1141 .map_err(|e| e.to_string())?;
1142 Ok(Key {
1143 global_id,
1144 build_version,
1145 deploy_generation: None,
1146 })
1147 }
1148 }
1149
1150 impl Default for Key {
1151 fn default() -> Self {
1152 Self {
1153 global_id: Default::default(),
1154 build_version: Version::new(0, 0, 0),
1155 deploy_generation: Some(0),
1156 }
1157 }
1158 }
1159
1160 impl Codec for Key {
1161 type Schema = KeySchema;
1162 type Storage = ();
1163
1164 fn codec_name() -> String {
1165 "TableKey".into()
1166 }
1167
1168 fn encode<B: BufMut>(&self, buf: &mut B) {
1169 buf.put(self.to_string().as_bytes())
1170 }
1171
1172 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1173 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1174 s.parse()
1175 }
1176
1177 fn encode_schema(_schema: &KeySchema) -> Bytes {
1178 Bytes::new()
1179 }
1180
1181 fn decode_schema(buf: &Bytes) -> Self::Schema {
1182 assert_eq!(*buf, Bytes::new());
1183 KeySchema
1184 }
1185 }
1186
1187 impl SimpleColumnarData for Key {
1188 type ArrowBuilder = StringBuilder;
1189 type ArrowColumn = StringArray;
1190
1191 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1192 builder.values_slice().len()
1193 }
1194
1195 fn push(&self, builder: &mut Self::ArrowBuilder) {
1196 builder.append_value(&self.to_string());
1197 }
1198
1199 fn push_null(builder: &mut Self::ArrowBuilder) {
1200 builder.append_null();
1201 }
1202
1203 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1204 *self = column.value(idx).parse().expect("valid Key");
1205 }
1206 }
1207
1208 #[derive(Debug, PartialEq)]
1209 pub(super) struct KeySchema;
1210
1211 impl Schema<Key> for KeySchema {
1212 type ArrowColumn = StringArray;
1213 type Statistics = NoneStats;
1214 type Decoder = SimpleColumnarDecoder<Key>;
1215 type Encoder = SimpleColumnarEncoder<Key>;
1216
1217 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1218 Ok(SimpleColumnarEncoder::default())
1219 }
1220
1221 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1222 Ok(SimpleColumnarDecoder::new(col))
1223 }
1224 }
1225}
1226
1227#[cfg(test)]
1228#[path = "builtin_schema_migration_tests.rs"]
1229mod tests;