1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40 MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::{Opaque, SinceHandle};
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
76 vec![
77 MigrationStep::replacement(
78 "0.149.0",
79 CatalogItemType::Source,
80 MZ_INTERNAL_SCHEMA,
81 "mz_sink_statistics_raw",
82 ),
83 MigrationStep::replacement(
84 "0.149.0",
85 CatalogItemType::Source,
86 MZ_INTERNAL_SCHEMA,
87 "mz_source_statistics_raw",
88 ),
89 MigrationStep::evolution(
90 "0.159.0",
91 CatalogItemType::Source,
92 MZ_INTERNAL_SCHEMA,
93 "mz_cluster_replica_metrics_history",
94 ),
95 MigrationStep::replacement(
96 "0.160.0",
97 CatalogItemType::Table,
98 MZ_CATALOG_SCHEMA,
99 "mz_roles",
100 ),
101 MigrationStep::replacement(
102 "0.160.0",
103 CatalogItemType::Table,
104 MZ_CATALOG_SCHEMA,
105 "mz_sinks",
106 ),
107 MigrationStep::replacement(
108 "26.18.0-dev.0",
109 CatalogItemType::MaterializedView,
110 MZ_CATALOG_SCHEMA,
111 "mz_databases",
112 ),
113 MigrationStep::replacement(
114 "26.19.0-dev.0",
115 CatalogItemType::MaterializedView,
116 MZ_CATALOG_SCHEMA,
117 "mz_schemas",
118 ),
119 MigrationStep::replacement(
120 "26.19.0-dev.0",
121 CatalogItemType::MaterializedView,
122 MZ_CATALOG_SCHEMA,
123 "mz_role_members",
124 ),
125 MigrationStep::replacement(
126 "26.19.0-dev.0",
127 CatalogItemType::MaterializedView,
128 MZ_INTERNAL_SCHEMA,
129 "mz_network_policies",
130 ),
131 MigrationStep::replacement(
132 "26.19.0-dev.0",
133 CatalogItemType::MaterializedView,
134 MZ_INTERNAL_SCHEMA,
135 "mz_network_policy_rules",
136 ),
137 MigrationStep::replacement(
138 "26.19.0-dev.0",
139 CatalogItemType::MaterializedView,
140 MZ_INTERNAL_SCHEMA,
141 "mz_cluster_workload_classes",
142 ),
143 MigrationStep::replacement(
144 "26.19.0-dev.0",
145 CatalogItemType::MaterializedView,
146 MZ_INTERNAL_SCHEMA,
147 "mz_internal_cluster_replicas",
148 ),
149 MigrationStep::replacement(
150 "26.19.0-dev.0",
151 CatalogItemType::MaterializedView,
152 MZ_INTERNAL_SCHEMA,
153 "mz_pending_cluster_replicas",
154 ),
155 ]
156});
157
158#[derive(Clone, Debug)]
160struct MigrationStep {
161 version: Version,
163 object: SystemObjectDescription,
165 mechanism: Mechanism,
167}
168
169impl MigrationStep {
170 fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
172 Self {
173 version: Version::parse(version).expect("valid"),
174 object: SystemObjectDescription {
175 schema_name: schema.into(),
176 object_type: type_,
177 object_name: name.into(),
178 },
179 mechanism: Mechanism::Evolution,
180 }
181 }
182
183 fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
185 Self {
186 version: Version::parse(version).expect("valid"),
187 object: SystemObjectDescription {
188 schema_name: schema.into(),
189 object_type: type_,
190 object_name: name.into(),
191 },
192 mechanism: Mechanism::Replacement,
193 }
194 }
195}
196
197#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
199#[allow(dead_code)]
200enum Mechanism {
201 Evolution,
206 Replacement,
210}
211
212pub(super) struct MigrationResult {
214 pub replaced_items: BTreeSet<CatalogItemId>,
216 pub cleanup_action: BoxFuture<'static, ()>,
218}
219
220impl Default for MigrationResult {
221 fn default() -> Self {
222 Self {
223 replaced_items: Default::default(),
224 cleanup_action: async {}.boxed(),
225 }
226 }
227}
228
229pub(super) async fn run(
235 build_info: &BuildInfo,
236 deploy_generation: u64,
237 txn: &mut Transaction<'_>,
238 config: BuiltinItemMigrationConfig,
239) -> Result<MigrationResult, Error> {
240 assert_eq!(config.read_only, txn.is_savepoint());
242
243 if *build_info == DUMMY_BUILD_INFO {
246 return Ok(MigrationResult::default());
247 }
248
249 let Some(durable_version) = get_migration_version(txn) else {
250 return Ok(MigrationResult::default());
252 };
253 let build_version = build_info.semver_version();
254
255 let collection_metadata = txn.get_collection_metadata();
256 let system_objects = txn
257 .get_system_object_mappings()
258 .map(|m| {
259 let object = m.description;
260 let global_id = m.unique_identifier.global_id;
261 let shard_id = collection_metadata.get(&global_id).copied();
262 let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
263 panic!("missing builtin {object:?}");
264 };
265 let info = ObjectInfo {
266 global_id,
267 shard_id,
268 builtin,
269 fingerprint: m.unique_identifier.fingerprint,
270 };
271 (object, info)
272 })
273 .collect();
274
275 let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
276
277 let migration = Migration {
278 source_version: durable_version.clone(),
279 target_version: build_version.clone(),
280 deploy_generation,
281 system_objects,
282 migration_shard,
283 config,
284 };
285
286 let result = migration.run(&MIGRATIONS).await.map_err(|e| {
287 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
288 last_seen_version: durable_version.to_string(),
289 this_version: build_version.to_string(),
290 cause: e.to_string(),
291 })
292 })?;
293
294 result.apply(txn);
295
296 let replaced_items = txn
297 .get_system_object_mappings()
298 .map(|m| m.unique_identifier)
299 .filter(|ids| result.new_shards.contains_key(&ids.global_id))
300 .map(|ids| ids.catalog_id)
301 .collect();
302
303 Ok(MigrationResult {
304 replaced_items,
305 cleanup_action: result.cleanup_action,
306 })
307}
308
309struct MigrationRunResult {
311 new_shards: BTreeMap<GlobalId, ShardId>,
312 new_fingerprints: BTreeMap<SystemObjectDescription, String>,
313 shards_to_finalize: BTreeSet<ShardId>,
314 cleanup_action: BoxFuture<'static, ()>,
315}
316
317impl Default for MigrationRunResult {
318 fn default() -> Self {
319 Self {
320 new_shards: BTreeMap::new(),
321 new_fingerprints: BTreeMap::new(),
322 shards_to_finalize: BTreeSet::new(),
323 cleanup_action: async {}.boxed(),
324 }
325 }
326}
327
328impl MigrationRunResult {
329 fn apply(&self, txn: &mut Transaction<'_>) {
331 let replaced_ids = self.new_shards.keys().copied().collect();
333 let old_metadata = txn.delete_collection_metadata(replaced_ids);
334 txn.insert_collection_metadata(self.new_shards.clone())
335 .expect("inserting unique shards IDs after deleting existing entries");
336
337 let mut unfinalized_shards: BTreeSet<_> =
339 old_metadata.into_iter().map(|(_, sid)| sid).collect();
340 unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
341 txn.insert_unfinalized_shards(unfinalized_shards)
342 .expect("cannot fail");
343
344 let mappings = txn
346 .get_system_object_mappings()
347 .filter_map(|m| {
348 let fingerprint = self.new_fingerprints.get(&m.description)?;
349 Some(SystemObjectMapping {
350 description: m.description,
351 unique_identifier: SystemObjectUniqueIdentifier {
352 catalog_id: m.unique_identifier.catalog_id,
353 global_id: m.unique_identifier.global_id,
354 fingerprint: fingerprint.clone(),
355 },
356 })
357 })
358 .collect();
359 txn.set_system_object_mappings(mappings)
360 .expect("filtered existing mappings remain unique");
361 }
362}
363
364#[derive(Clone, Debug)]
366struct ObjectInfo {
367 global_id: GlobalId,
368 shard_id: Option<ShardId>,
369 builtin: &'static Builtin<NameReference>,
370 fingerprint: String,
371}
372
373struct Migration {
375 source_version: Version,
380 target_version: Version,
384 deploy_generation: u64,
386 system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
388 migration_shard: ShardId,
390 config: BuiltinItemMigrationConfig,
392}
393
394impl Migration {
395 async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
396 info!(
397 deploy_generation = %self.deploy_generation,
398 "running builtin schema migration: {} -> {}",
399 self.source_version, self.target_version
400 );
401
402 self.validate_migration_steps(steps);
403
404 let (force, plan) = match self.config.force_migration.as_deref() {
405 None => (false, self.plan_migration(steps)),
406 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
407 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
408 Some(other) => panic!("unknown force migration mechanism: {other}"),
409 };
410
411 if self.source_version == self.target_version && !force {
412 info!("skipping migration: already at target version");
413 return Ok(MigrationRunResult::default());
414 } else if self.source_version > self.target_version {
415 bail!("downgrade not supported");
416 }
417
418 if !self.config.read_only {
421 self.upgrade_migration_shard_version().await;
422 }
423
424 info!("executing migration plan: {plan:?}");
425
426 self.migrate_evolve(&plan.evolve).await?;
427 let new_shards = self.migrate_replace(&plan.replace).await?;
428
429 let mut migrated_objects = BTreeSet::new();
430 migrated_objects.extend(plan.evolve);
431 migrated_objects.extend(plan.replace);
432
433 let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
434
435 let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
436
437 Ok(MigrationRunResult {
438 new_shards,
439 new_fingerprints,
440 shards_to_finalize,
441 cleanup_action,
442 })
443 }
444
445 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
449 for step in steps {
450 assert!(
451 step.version <= self.target_version,
452 "migration step version greater than target version: {} > {}",
453 step.version,
454 self.target_version,
455 );
456
457 let object = &step.object;
458
459 assert_ne!(
467 &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
468 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
469 );
470
471 assert_ne!(
474 &*MZ_CATALOG_RAW_DESCRIPTION, object,
475 "mz_catalog_raw cannot be migrated"
476 );
477
478 let Some(object_info) = self.system_objects.get(object) else {
479 panic!("migration step for non-existent builtin: {object:?}");
480 };
481
482 let builtin = object_info.builtin;
483 use Builtin::*;
484 assert!(
485 matches!(
486 builtin,
487 Table(..) | Source(..) | MaterializedView(..) | ContinualTask(..)
488 ),
489 "schema migration not supported for builtin: {builtin:?}",
490 );
491 }
492 }
493
494 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
496 let steps = steps.iter().filter(|s| s.version > self.source_version);
498
499 let mut by_object = BTreeMap::new();
503 for step in steps {
504 if let Some(entry) = by_object.get_mut(&step.object) {
505 *entry = match (step.mechanism, *entry) {
506 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
507 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
508 Mechanism::Replacement
509 }
510 };
511 } else {
512 by_object.insert(step.object.clone(), step.mechanism);
513 }
514 }
515
516 let mut plan = Plan::default();
517 for (object, mechanism) in by_object {
518 match mechanism {
519 Mechanism::Evolution => plan.evolve.push(object),
520 Mechanism::Replacement => plan.replace.push(object),
521 }
522 }
523
524 plan
525 }
526
527 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
529 let objects = self
530 .system_objects
531 .iter()
532 .filter(|(_, info)| {
533 use Builtin::*;
534 match info.builtin {
535 Table(..) | MaterializedView(..) | ContinualTask(..) => true,
536 Source(source) => **source != *MZ_CATALOG_RAW,
537 Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
538 }
539 })
540 .map(|(object, _)| object.clone())
541 .collect();
542
543 let mut plan = Plan::default();
544 match mechanism {
545 Mechanism::Evolution => plan.evolve = objects,
546 Mechanism::Replacement => plan.replace = objects,
547 }
548
549 plan
550 }
551
552 async fn upgrade_migration_shard_version(&self) {
554 let persist = &self.config.persist_client;
555 let diagnostics = Diagnostics {
556 shard_name: "builtin_migration".to_string(),
557 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
558 };
559
560 persist
561 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
562 self.migration_shard,
563 diagnostics,
564 )
565 .await
566 .expect("valid usage");
567 }
568
569 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
571 for object in objects {
572 self.migrate_evolve_one(object).await?;
573 }
574 Ok(())
575 }
576
577 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
578 let persist = &self.config.persist_client;
579
580 let Some(object_info) = self.system_objects.get(object) else {
581 bail!("missing builtin {object:?}");
582 };
583 let id = object_info.global_id;
584
585 let Some(shard_id) = object_info.shard_id else {
586 if self.config.read_only {
591 bail!("missing shard ID for builtin {object:?} ({id})");
592 } else {
593 return Ok(());
594 }
595 };
596
597 let target_desc = match object_info.builtin {
598 Builtin::Table(table) => &table.desc,
599 Builtin::Source(source) => &source.desc,
600 Builtin::MaterializedView(mv) => &mv.desc,
601 Builtin::ContinualTask(ct) => &ct.desc,
602 _ => bail!("not a storage collection: {object:?}"),
603 };
604
605 let diagnostics = Diagnostics {
606 shard_name: id.to_string(),
607 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
608 };
609 let source_schema = persist
610 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
611 .await
612 .expect("valid usage");
613
614 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
615
616 if self.config.read_only {
617 if let Some((_, source_desc, _)) = &source_schema {
620 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
621 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
622 if backward_compatible(&old, &new).is_none() {
623 bail!(
624 "incompatible schema evolution for {object:?}: \
625 {source_desc:?} -> {target_desc:?}"
626 );
627 }
628 }
629
630 return Ok(());
631 }
632
633 let (mut schema_id, mut source_desc) = match source_schema {
634 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
635 None => {
636 debug!(%id, %shard_id, "no previous schema found; registering initial one");
641 let schema_id = persist
642 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
643 shard_id,
644 target_desc,
645 &UnitSchema,
646 diagnostics.clone(),
647 )
648 .await
649 .expect("valid usage");
650 if schema_id.is_some() {
651 return Ok(());
652 }
653
654 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
655 let (schema_id, source_desc, _) = persist
656 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
657 shard_id,
658 diagnostics.clone(),
659 )
660 .await
661 .expect("valid usage")
662 .expect("known to exist");
663
664 (schema_id, source_desc)
665 }
666 };
667
668 loop {
669 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
674 let result = persist
675 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
676 shard_id,
677 schema_id,
678 target_desc,
679 &UnitSchema,
680 diagnostics.clone(),
681 )
682 .await
683 .expect("valid usage");
684
685 match result {
686 CaESchema::Ok(schema_id) => {
687 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
688 break;
689 }
690 CaESchema::Incompatible => bail!(
691 "incompatible schema evolution for {object:?}: \
692 {source_desc:?} -> {target_desc:?}"
693 ),
694 CaESchema::ExpectedMismatch {
695 schema_id: new_id,
696 key,
697 val: UnitSchema,
698 } => {
699 schema_id = new_id;
700 source_desc = key;
701 }
702 }
703 }
704
705 Ok(())
706 }
707
708 async fn migrate_replace(
710 &self,
711 objects: &[SystemObjectDescription],
712 ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
713 if objects.is_empty() {
714 return Ok(Default::default());
715 }
716
717 let diagnostics = Diagnostics {
718 shard_name: "builtin_migration".to_string(),
719 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
720 };
721 let (mut persist_write, mut persist_read) =
722 self.open_migration_shard(diagnostics.clone()).await;
723
724 let mut ids_to_replace = BTreeSet::new();
725 for object in objects {
726 if let Some(info) = self.system_objects.get(object) {
727 ids_to_replace.insert(info.global_id);
728 } else {
729 bail!("missing id for builtin {object:?}");
730 }
731 }
732
733 info!(?objects, ?ids_to_replace, "migrating by replacement");
734
735 let replaced_shards = loop {
738 if let Some(shards) = self
739 .try_get_or_insert_replacement_shards(
740 &ids_to_replace,
741 &mut persist_write,
742 &mut persist_read,
743 )
744 .await?
745 {
746 break shards;
747 }
748 };
749
750 Ok(replaced_shards)
751 }
752
753 async fn try_get_or_insert_replacement_shards(
763 &self,
764 ids_to_replace: &BTreeSet<GlobalId>,
765 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
766 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
767 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
768 let upper = persist_write.fetch_recent_upper().await;
769 let write_ts = *upper.as_option().expect("migration shard not sealed");
770
771 let mut ids_to_replace = ids_to_replace.clone();
772 let mut replaced_shards = BTreeMap::new();
773
774 if let Some(read_ts) = write_ts.step_back() {
782 let pred = |key: &migration_shard::Key| {
783 key.build_version == self.target_version
784 && key.deploy_generation == Some(self.deploy_generation)
785 };
786 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
787 for (key, shard_id) in entries {
788 let id = GlobalId::System(key.global_id);
789 if ids_to_replace.remove(&id) {
790 replaced_shards.insert(id, shard_id);
791 }
792 }
793
794 debug!(
795 %read_ts, ?replaced_shards, ?ids_to_replace,
796 "found existing entries in migration shard",
797 );
798 }
799
800 if ids_to_replace.is_empty() {
801 return Ok(Some(replaced_shards));
802 }
803 }
804
805 let mut updates = Vec::new();
809 for id in ids_to_replace {
810 let shard_id = ShardId::new();
811 replaced_shards.insert(id, shard_id);
812
813 let GlobalId::System(global_id) = id else {
814 bail!("attempt to migrate a non-system collection: {id}");
815 };
816 let key = migration_shard::Key {
817 global_id,
818 build_version: self.target_version.clone(),
819 deploy_generation: Some(self.deploy_generation),
820 };
821 updates.push(((key, shard_id), write_ts, 1));
822 }
823
824 let upper = Antichain::from_elem(write_ts);
825 let new_upper = Antichain::from_elem(write_ts.step_forward());
826 debug!(%write_ts, "attempting insert into migration shard");
827 let result = persist_write
828 .compare_and_append(updates, upper, new_upper)
829 .await
830 .expect("valid usage");
831
832 match result {
833 Ok(()) => {
834 debug!(
835 %write_ts, ?replaced_shards,
836 "successfully inserted into migration shard"
837 );
838 Ok(Some(replaced_shards))
839 }
840 Err(_mismatch) => Ok(None),
841 }
842 }
843
844 async fn open_migration_shard(
846 &self,
847 diagnostics: Diagnostics,
848 ) -> (
849 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
850 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
851 ) {
852 let persist = &self.config.persist_client;
853
854 persist
855 .open(
856 self.migration_shard,
857 Arc::new(migration_shard::KeySchema),
858 Arc::new(ShardIdSchema),
859 diagnostics,
860 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
861 )
862 .await
863 .expect("valid usage")
864 }
865
866 async fn open_migration_shard_since(
868 &self,
869 diagnostics: Diagnostics,
870 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
871 self.config
872 .persist_client
873 .open_critical_since(
874 self.migration_shard,
875 PersistClient::CONTROLLER_CRITICAL_SINCE,
878 Opaque::encode(&i64::MIN),
879 diagnostics.clone(),
880 )
881 .await
882 .expect("valid usage")
883 }
884
885 fn update_fingerprints(
890 &self,
891 migrated_items: &BTreeSet<SystemObjectDescription>,
892 ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
893 let mut new_fingerprints = BTreeMap::new();
894 for (object, object_info) in &self.system_objects {
895 let id = object_info.global_id;
896 let builtin = object_info.builtin;
897
898 let fingerprint = builtin.fingerprint();
899 if fingerprint == object_info.fingerprint {
900 continue; }
902
903 let migrated = migrated_items.contains(object);
905 let ephemeral = matches!(
907 builtin,
908 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
909 );
910
911 if migrated || ephemeral {
912 new_fingerprints.insert(object.clone(), fingerprint);
913 } else if builtin.runtime_alterable() {
914 assert_eq!(
917 object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
918 "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
919 );
920 } else {
921 panic!(
922 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
923 fingerprint, object_info.fingerprint,
924 );
925 }
926 }
927
928 Ok(new_fingerprints)
929 }
930
931 async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
948 let noop_action = async {}.boxed();
949 let noop_result = (BTreeSet::new(), noop_action);
950
951 if self.config.read_only {
952 return Ok(noop_result);
953 }
954
955 let diagnostics = Diagnostics {
956 shard_name: "builtin_migration".to_string(),
957 handle_purpose: "builtin schema migration cleanup".into(),
958 };
959 let (mut persist_write, mut persist_read) =
960 self.open_migration_shard(diagnostics.clone()).await;
961 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
962
963 let upper = persist_write.fetch_recent_upper().await.clone();
964 let write_ts = *upper.as_option().expect("migration shard not sealed");
965 let Some(read_ts) = write_ts.step_back() else {
966 return Ok(noop_result);
967 };
968
969 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
971 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
972 else {
973 return Ok(noop_result);
974 };
975
976 debug!(
977 ?stale_entries,
978 "cleaning migration shard up to version {}", self.target_version,
979 );
980
981 let current_shards: BTreeMap<_, _> = self
982 .system_objects
983 .values()
984 .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
985 .collect();
986
987 let mut shards_to_finalize = BTreeSet::new();
988 let mut retractions = Vec::new();
989 for (key, shard_id) in stale_entries {
990 let gid = GlobalId::System(key.global_id);
994 if current_shards.get(&gid) != Some(&shard_id) {
995 shards_to_finalize.insert(shard_id);
996 }
997
998 retractions.push(((key, shard_id), write_ts, -1));
999 }
1000
1001 let cleanup_action = async move {
1002 if !retractions.is_empty() {
1003 let new_upper = Antichain::from_elem(write_ts.step_forward());
1004 let result = persist_write
1005 .compare_and_append(retractions, upper, new_upper)
1006 .await
1007 .expect("valid usage");
1008 match result {
1009 Ok(()) => debug!("cleaned up migration shard"),
1010 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1011 }
1012 }
1013 }
1014 .boxed();
1015
1016 let o = persist_since.opaque().clone();
1018 let new_since = Antichain::from_elem(read_ts);
1019 let result = persist_since
1020 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1021 .await;
1022 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1023
1024 Ok((shards_to_finalize, cleanup_action))
1025 }
1026}
1027
1028async fn read_migration_shard<P>(
1034 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1035 read_ts: Timestamp,
1036 predicate: P,
1037) -> Option<Vec<(migration_shard::Key, ShardId)>>
1038where
1039 P: for<'a> Fn(&migration_shard::Key) -> bool,
1040{
1041 let as_of = Antichain::from_elem(read_ts);
1042 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1043
1044 assert!(
1045 updates.iter().all(|(_, _, diff)| *diff == 1),
1046 "migration shard contains invalid diffs: {updates:?}",
1047 );
1048
1049 let entries: Vec<_> = updates
1050 .into_iter()
1051 .map(|(data, _, _)| data)
1052 .filter(move |(key, _)| predicate(key))
1053 .collect();
1054
1055 (!entries.is_empty()).then_some(entries)
1056}
1057
1058#[derive(Debug, Default)]
1060struct Plan {
1061 evolve: Vec<SystemObjectDescription>,
1063 replace: Vec<SystemObjectDescription>,
1065}
1066
1067mod migration_shard {
1069 use std::fmt;
1070 use std::str::FromStr;
1071
1072 use arrow::array::{StringArray, StringBuilder};
1073 use bytes::{BufMut, Bytes};
1074 use mz_persist_types::Codec;
1075 use mz_persist_types::codec_impls::{
1076 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1077 };
1078 use mz_persist_types::columnar::Schema;
1079 use mz_persist_types::stats::NoneStats;
1080 use semver::Version;
1081 use serde::{Deserialize, Serialize};
1082
1083 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1084 pub(super) struct Key {
1085 pub(super) global_id: u64,
1086 pub(super) build_version: Version,
1087 pub(super) deploy_generation: Option<u64>,
1091 }
1092
1093 impl fmt::Display for Key {
1094 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095 if self.deploy_generation.is_some() {
1096 let s = serde_json::to_string(self).expect("JSON serializable");
1098 f.write_str(&s)
1099 } else {
1100 write!(f, "{}-{}", self.global_id, self.build_version)
1102 }
1103 }
1104 }
1105
1106 impl FromStr for Key {
1107 type Err = String;
1108
1109 fn from_str(s: &str) -> Result<Self, String> {
1110 if let Ok(key) = serde_json::from_str(s) {
1112 return Ok(key);
1113 };
1114
1115 let parts: Vec<_> = s.splitn(2, '-').collect();
1117 let &[global_id, build_version] = parts.as_slice() else {
1118 return Err(format!("invalid Key '{s}'"));
1119 };
1120 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1121 let build_version = build_version
1122 .parse::<Version>()
1123 .map_err(|e| e.to_string())?;
1124 Ok(Key {
1125 global_id,
1126 build_version,
1127 deploy_generation: None,
1128 })
1129 }
1130 }
1131
1132 impl Default for Key {
1133 fn default() -> Self {
1134 Self {
1135 global_id: Default::default(),
1136 build_version: Version::new(0, 0, 0),
1137 deploy_generation: Some(0),
1138 }
1139 }
1140 }
1141
1142 impl Codec for Key {
1143 type Schema = KeySchema;
1144 type Storage = ();
1145
1146 fn codec_name() -> String {
1147 "TableKey".into()
1148 }
1149
1150 fn encode<B: BufMut>(&self, buf: &mut B) {
1151 buf.put(self.to_string().as_bytes())
1152 }
1153
1154 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1155 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1156 s.parse()
1157 }
1158
1159 fn encode_schema(_schema: &KeySchema) -> Bytes {
1160 Bytes::new()
1161 }
1162
1163 fn decode_schema(buf: &Bytes) -> Self::Schema {
1164 assert_eq!(*buf, Bytes::new());
1165 KeySchema
1166 }
1167 }
1168
1169 impl SimpleColumnarData for Key {
1170 type ArrowBuilder = StringBuilder;
1171 type ArrowColumn = StringArray;
1172
1173 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1174 builder.values_slice().len()
1175 }
1176
1177 fn push(&self, builder: &mut Self::ArrowBuilder) {
1178 builder.append_value(&self.to_string());
1179 }
1180
1181 fn push_null(builder: &mut Self::ArrowBuilder) {
1182 builder.append_null();
1183 }
1184
1185 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1186 *self = column.value(idx).parse().expect("valid Key");
1187 }
1188 }
1189
1190 #[derive(Debug, PartialEq)]
1191 pub(super) struct KeySchema;
1192
1193 impl Schema<Key> for KeySchema {
1194 type ArrowColumn = StringArray;
1195 type Statistics = NoneStats;
1196 type Decoder = SimpleColumnarDecoder<Key>;
1197 type Encoder = SimpleColumnarEncoder<Key>;
1198
1199 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1200 Ok(SimpleColumnarEncoder::default())
1201 }
1202
1203 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1204 Ok(SimpleColumnarDecoder::new(col))
1205 }
1206 }
1207}
1208
1209#[cfg(test)]
1210#[path = "builtin_schema_migration_tests.rs"]
1211mod tests;