1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40 MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD,
41 MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
42};
43use mz_catalog::config::BuiltinItemMigrationConfig;
44use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
45use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
46use mz_catalog::memory::error::{Error, ErrorKind};
47use mz_ore::soft_assert_or_log;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
49use mz_persist_client::critical::{Opaque, SinceHandle};
50use mz_persist_client::read::ReadHandle;
51use mz_persist_client::schema::CaESchema;
52use mz_persist_client::write::WriteHandle;
53use mz_persist_client::{Diagnostics, PersistClient};
54use mz_persist_types::ShardId;
55use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
56use mz_persist_types::schema::backward_compatible;
57use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
58use mz_repr::{CatalogItemId, GlobalId, Timestamp};
59use mz_sql::catalog::{CatalogItemType, NameReference};
60use mz_storage_client::controller::StorageTxn;
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use semver::Version;
64use timely::progress::Antichain;
65use tracing::{debug, info};
66
67use crate::catalog::migrate::get_migration_version;
68
69static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
77 vec![
78 MigrationStep::replacement(
79 "0.149.0",
80 CatalogItemType::Source,
81 MZ_INTERNAL_SCHEMA,
82 "mz_sink_statistics_raw",
83 ),
84 MigrationStep::replacement(
85 "0.149.0",
86 CatalogItemType::Source,
87 MZ_INTERNAL_SCHEMA,
88 "mz_source_statistics_raw",
89 ),
90 MigrationStep::evolution(
91 "0.159.0",
92 CatalogItemType::Source,
93 MZ_INTERNAL_SCHEMA,
94 "mz_cluster_replica_metrics_history",
95 ),
96 MigrationStep::replacement(
97 "0.160.0",
98 CatalogItemType::Table,
99 MZ_CATALOG_SCHEMA,
100 "mz_roles",
101 ),
102 MigrationStep::replacement(
103 "0.160.0",
104 CatalogItemType::Table,
105 MZ_CATALOG_SCHEMA,
106 "mz_sinks",
107 ),
108 MigrationStep::replacement(
109 "26.18.0-dev.0",
110 CatalogItemType::MaterializedView,
111 MZ_CATALOG_SCHEMA,
112 "mz_databases",
113 ),
114 MigrationStep::replacement(
115 "26.19.0-dev.0",
116 CatalogItemType::MaterializedView,
117 MZ_CATALOG_SCHEMA,
118 "mz_schemas",
119 ),
120 MigrationStep::replacement(
121 "26.19.0-dev.0",
122 CatalogItemType::MaterializedView,
123 MZ_CATALOG_SCHEMA,
124 "mz_role_members",
125 ),
126 MigrationStep::replacement(
127 "26.19.0-dev.0",
128 CatalogItemType::MaterializedView,
129 MZ_INTERNAL_SCHEMA,
130 "mz_network_policies",
131 ),
132 MigrationStep::replacement(
133 "26.19.0-dev.0",
134 CatalogItemType::MaterializedView,
135 MZ_INTERNAL_SCHEMA,
136 "mz_network_policy_rules",
137 ),
138 MigrationStep::replacement(
139 "26.19.0-dev.0",
140 CatalogItemType::MaterializedView,
141 MZ_INTERNAL_SCHEMA,
142 "mz_cluster_workload_classes",
143 ),
144 MigrationStep::replacement(
145 "26.19.0-dev.0",
146 CatalogItemType::MaterializedView,
147 MZ_INTERNAL_SCHEMA,
148 "mz_internal_cluster_replicas",
149 ),
150 MigrationStep::replacement(
151 "26.19.0-dev.0",
152 CatalogItemType::MaterializedView,
153 MZ_INTERNAL_SCHEMA,
154 "mz_pending_cluster_replicas",
155 ),
156 MigrationStep::replacement(
157 "26.20.0-dev.0",
158 CatalogItemType::MaterializedView,
159 MZ_CATALOG_SCHEMA,
160 "mz_materialized_views",
161 ),
162 MigrationStep::replacement(
163 "26.22.0-dev.0",
164 CatalogItemType::MaterializedView,
165 MZ_CATALOG_SCHEMA,
166 "mz_connections",
167 ),
168 MigrationStep::replacement(
169 "26.22.0-dev.0",
170 CatalogItemType::MaterializedView,
171 MZ_CATALOG_SCHEMA,
172 "mz_secrets",
173 ),
174 ]
175});
176
177#[derive(Clone, Debug)]
179struct MigrationStep {
180 version: Version,
182 object: SystemObjectDescription,
184 mechanism: Mechanism,
186}
187
188impl MigrationStep {
189 fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
191 Self {
192 version: Version::parse(version).expect("valid"),
193 object: SystemObjectDescription {
194 schema_name: schema.into(),
195 object_type: type_,
196 object_name: name.into(),
197 },
198 mechanism: Mechanism::Evolution,
199 }
200 }
201
202 fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
204 Self {
205 version: Version::parse(version).expect("valid"),
206 object: SystemObjectDescription {
207 schema_name: schema.into(),
208 object_type: type_,
209 object_name: name.into(),
210 },
211 mechanism: Mechanism::Replacement,
212 }
213 }
214}
215
216#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
218#[allow(dead_code)]
219enum Mechanism {
220 Evolution,
225 Replacement,
229}
230
231pub(super) struct MigrationResult {
233 pub replaced_items: BTreeSet<CatalogItemId>,
235 pub cleanup_action: BoxFuture<'static, ()>,
237}
238
239impl Default for MigrationResult {
240 fn default() -> Self {
241 Self {
242 replaced_items: Default::default(),
243 cleanup_action: async {}.boxed(),
244 }
245 }
246}
247
248pub(super) async fn run(
254 build_info: &BuildInfo,
255 deploy_generation: u64,
256 txn: &mut Transaction<'_>,
257 config: BuiltinItemMigrationConfig,
258) -> Result<MigrationResult, Error> {
259 assert_eq!(config.read_only, txn.is_savepoint());
261
262 if *build_info == DUMMY_BUILD_INFO {
265 return Ok(MigrationResult::default());
266 }
267
268 let Some(durable_version) = get_migration_version(txn) else {
269 return Ok(MigrationResult::default());
271 };
272 let build_version = build_info.semver_version();
273
274 let collection_metadata = txn.get_collection_metadata();
275 let system_objects = txn
276 .get_system_object_mappings()
277 .map(|m| {
278 let object = m.description;
279 let global_id = m.unique_identifier.global_id;
280 let shard_id = collection_metadata.get(&global_id).copied();
281 let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
282 panic!("missing builtin {object:?}");
283 };
284 let info = ObjectInfo {
285 global_id,
286 shard_id,
287 builtin,
288 fingerprint: m.unique_identifier.fingerprint,
289 };
290 (object, info)
291 })
292 .collect();
293
294 let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
295
296 let migration = Migration {
297 source_version: durable_version.clone(),
298 target_version: build_version.clone(),
299 deploy_generation,
300 system_objects,
301 migration_shard,
302 config,
303 };
304
305 let result = migration.run(&MIGRATIONS).await.map_err(|e| {
306 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
307 last_seen_version: durable_version.to_string(),
308 this_version: build_version.to_string(),
309 cause: e.to_string(),
310 })
311 })?;
312
313 result.apply(txn);
314
315 let replaced_items = txn
316 .get_system_object_mappings()
317 .map(|m| m.unique_identifier)
318 .filter(|ids| result.new_shards.contains_key(&ids.global_id))
319 .map(|ids| ids.catalog_id)
320 .collect();
321
322 Ok(MigrationResult {
323 replaced_items,
324 cleanup_action: result.cleanup_action,
325 })
326}
327
328struct MigrationRunResult {
330 new_shards: BTreeMap<GlobalId, ShardId>,
331 new_fingerprints: BTreeMap<SystemObjectDescription, String>,
332 shards_to_finalize: BTreeSet<ShardId>,
333 cleanup_action: BoxFuture<'static, ()>,
334}
335
336impl Default for MigrationRunResult {
337 fn default() -> Self {
338 Self {
339 new_shards: BTreeMap::new(),
340 new_fingerprints: BTreeMap::new(),
341 shards_to_finalize: BTreeSet::new(),
342 cleanup_action: async {}.boxed(),
343 }
344 }
345}
346
347impl MigrationRunResult {
348 fn apply(&self, txn: &mut Transaction<'_>) {
350 let replaced_ids = self.new_shards.keys().copied().collect();
352 let old_metadata = txn.delete_collection_metadata(replaced_ids);
353 txn.insert_collection_metadata(self.new_shards.clone())
354 .expect("inserting unique shards IDs after deleting existing entries");
355
356 let mut unfinalized_shards: BTreeSet<_> =
358 old_metadata.into_iter().map(|(_, sid)| sid).collect();
359 unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
360 txn.insert_unfinalized_shards(unfinalized_shards)
361 .expect("cannot fail");
362
363 let mappings = txn
365 .get_system_object_mappings()
366 .filter_map(|m| {
367 let fingerprint = self.new_fingerprints.get(&m.description)?;
368 Some(SystemObjectMapping {
369 description: m.description,
370 unique_identifier: SystemObjectUniqueIdentifier {
371 catalog_id: m.unique_identifier.catalog_id,
372 global_id: m.unique_identifier.global_id,
373 fingerprint: fingerprint.clone(),
374 },
375 })
376 })
377 .collect();
378 txn.set_system_object_mappings(mappings)
379 .expect("filtered existing mappings remain unique");
380 }
381}
382
383#[derive(Clone, Debug)]
385struct ObjectInfo {
386 global_id: GlobalId,
387 shard_id: Option<ShardId>,
388 builtin: &'static Builtin<NameReference>,
389 fingerprint: String,
390}
391
392struct Migration {
394 source_version: Version,
399 target_version: Version,
403 deploy_generation: u64,
405 system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
407 migration_shard: ShardId,
409 config: BuiltinItemMigrationConfig,
411}
412
413impl Migration {
414 async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
415 info!(
416 deploy_generation = %self.deploy_generation,
417 "running builtin schema migration: {} -> {}",
418 self.source_version, self.target_version
419 );
420
421 self.validate_migration_steps(steps);
422
423 let force_migration = if self.source_version != self.target_version
426 && self.source_version.pre.as_str().starts_with("dev")
427 && self.config.force_migration.is_none()
428 {
429 Some("evolution".to_string())
430 } else {
431 self.config.force_migration.clone()
432 };
433
434 let (force, plan) = match force_migration.as_deref() {
435 None => (false, self.plan_migration(steps)),
436 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
437 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
438 Some(other) => panic!("unknown force migration mechanism: {other}"),
439 };
440
441 if self.source_version == self.target_version && !force {
442 info!("skipping migration: already at target version");
443 return Ok(MigrationRunResult::default());
444 } else if self.source_version > self.target_version {
445 bail!("downgrade not supported");
446 }
447
448 if !self.config.read_only {
451 self.upgrade_migration_shard_version().await;
452 }
453
454 info!("executing migration plan: {plan:?}");
455
456 self.migrate_evolve(&plan.evolve).await?;
457 let new_shards = self.migrate_replace(&plan.replace).await?;
458
459 let mut migrated_objects = BTreeSet::new();
460 migrated_objects.extend(plan.evolve);
461 migrated_objects.extend(plan.replace);
462
463 let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
464
465 let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
466
467 Ok(MigrationRunResult {
468 new_shards,
469 new_fingerprints,
470 shards_to_finalize,
471 cleanup_action,
472 })
473 }
474
475 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
479 for step in steps {
480 assert!(
481 step.version <= self.target_version,
482 "migration step version greater than target version: {} > {}",
483 step.version,
484 self.target_version,
485 );
486
487 let object = &step.object;
488
489 assert_ne!(
497 &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
498 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
499 );
500
501 assert_ne!(
505 &*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object,
506 "mz_object_arrangement_size_history cannot be migrated or else the table will be truncated"
507 );
508
509 assert_ne!(
512 &*MZ_CATALOG_RAW_DESCRIPTION, object,
513 "mz_catalog_raw cannot be migrated"
514 );
515
516 let Some(object_info) = self.system_objects.get(object) else {
517 panic!("migration step for non-existent builtin: {object:?}");
518 };
519
520 let builtin = object_info.builtin;
521 use Builtin::*;
522 assert!(
523 matches!(builtin, Table(..) | Source(..) | MaterializedView(..)),
524 "schema migration not supported for builtin: {builtin:?}",
525 );
526 }
527 }
528
529 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
531 let steps = steps.iter().filter(|s| s.version > self.source_version);
533
534 let mut by_object = BTreeMap::new();
538 for step in steps {
539 if let Some(entry) = by_object.get_mut(&step.object) {
540 *entry = match (step.mechanism, *entry) {
541 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
542 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
543 Mechanism::Replacement
544 }
545 };
546 } else {
547 by_object.insert(step.object.clone(), step.mechanism);
548 }
549 }
550
551 let mut plan = Plan::default();
552 for (object, mechanism) in by_object {
553 match mechanism {
554 Mechanism::Evolution => plan.evolve.push(object),
555 Mechanism::Replacement => plan.replace.push(object),
556 }
557 }
558
559 plan
560 }
561
562 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
564 let objects = self
565 .system_objects
566 .iter()
567 .filter(|(_, info)| {
568 use Builtin::*;
569 match info.builtin {
570 Table(table) => **table != *MZ_STORAGE_USAGE_BY_SHARD,
573 MaterializedView(..) => true,
574 Source(source) => **source != *MZ_CATALOG_RAW,
575 Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
576 }
577 })
578 .map(|(object, _)| object.clone())
579 .collect();
580
581 let mut plan = Plan::default();
582 match mechanism {
583 Mechanism::Evolution => plan.evolve = objects,
584 Mechanism::Replacement => plan.replace = objects,
585 }
586
587 plan
588 }
589
590 async fn upgrade_migration_shard_version(&self) {
592 let persist = &self.config.persist_client;
593 let diagnostics = Diagnostics {
594 shard_name: "builtin_migration".to_string(),
595 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
596 };
597
598 persist
599 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
600 self.migration_shard,
601 diagnostics,
602 )
603 .await
604 .expect("valid usage");
605 }
606
607 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
609 for object in objects {
610 self.migrate_evolve_one(object).await?;
611 }
612 Ok(())
613 }
614
615 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
616 let persist = &self.config.persist_client;
617
618 let Some(object_info) = self.system_objects.get(object) else {
619 bail!("missing builtin {object:?}");
620 };
621 let id = object_info.global_id;
622
623 let Some(shard_id) = object_info.shard_id else {
624 if self.config.read_only {
629 bail!("missing shard ID for builtin {object:?} ({id})");
630 } else {
631 return Ok(());
632 }
633 };
634
635 let target_desc = match object_info.builtin {
636 Builtin::Table(table) => &table.desc,
637 Builtin::Source(source) => &source.desc,
638 Builtin::MaterializedView(mv) => &mv.desc,
639 _ => bail!("not a storage collection: {object:?}"),
640 };
641
642 let diagnostics = Diagnostics {
643 shard_name: id.to_string(),
644 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
645 };
646 let source_schema = persist
647 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
648 .await
649 .expect("valid usage");
650
651 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
652
653 if self.config.read_only {
654 if let Some((_, source_desc, _)) = &source_schema {
657 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
658 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
659 if backward_compatible(&old, &new).is_none() {
660 bail!(
661 "incompatible schema evolution for {object:?}: \
662 {source_desc:?} -> {target_desc:?}"
663 );
664 }
665 }
666
667 return Ok(());
668 }
669
670 let (mut schema_id, mut source_desc) = match source_schema {
671 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
672 None => {
673 debug!(%id, %shard_id, "no previous schema found; registering initial one");
678 let schema_id = persist
679 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
680 shard_id,
681 target_desc,
682 &UnitSchema,
683 diagnostics.clone(),
684 )
685 .await
686 .expect("valid usage");
687 if schema_id.is_some() {
688 return Ok(());
689 }
690
691 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
692 let (schema_id, source_desc, _) = persist
693 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
694 shard_id,
695 diagnostics.clone(),
696 )
697 .await
698 .expect("valid usage")
699 .expect("known to exist");
700
701 (schema_id, source_desc)
702 }
703 };
704
705 loop {
706 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
711 let result = persist
712 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
713 shard_id,
714 schema_id,
715 target_desc,
716 &UnitSchema,
717 diagnostics.clone(),
718 )
719 .await
720 .expect("valid usage");
721
722 match result {
723 CaESchema::Ok(schema_id) => {
724 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
725 break;
726 }
727 CaESchema::Incompatible => bail!(
728 "incompatible schema evolution for {object:?}: \
729 {source_desc:?} -> {target_desc:?}"
730 ),
731 CaESchema::ExpectedMismatch {
732 schema_id: new_id,
733 key,
734 val: UnitSchema,
735 } => {
736 schema_id = new_id;
737 source_desc = key;
738 }
739 }
740 }
741
742 Ok(())
743 }
744
745 async fn migrate_replace(
747 &self,
748 objects: &[SystemObjectDescription],
749 ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
750 if objects.is_empty() {
751 return Ok(Default::default());
752 }
753
754 let diagnostics = Diagnostics {
755 shard_name: "builtin_migration".to_string(),
756 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
757 };
758 let (mut persist_write, mut persist_read) =
759 self.open_migration_shard(diagnostics.clone()).await;
760
761 let mut ids_to_replace = BTreeSet::new();
762 for object in objects {
763 if let Some(info) = self.system_objects.get(object) {
764 ids_to_replace.insert(info.global_id);
765 } else {
766 bail!("missing id for builtin {object:?}");
767 }
768 }
769
770 info!(?objects, ?ids_to_replace, "migrating by replacement");
771
772 let replaced_shards = loop {
775 if let Some(shards) = self
776 .try_get_or_insert_replacement_shards(
777 &ids_to_replace,
778 &mut persist_write,
779 &mut persist_read,
780 )
781 .await?
782 {
783 break shards;
784 }
785 };
786
787 Ok(replaced_shards)
788 }
789
790 async fn try_get_or_insert_replacement_shards(
800 &self,
801 ids_to_replace: &BTreeSet<GlobalId>,
802 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
803 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
804 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
805 let upper = persist_write.fetch_recent_upper().await;
806 let write_ts = *upper.as_option().expect("migration shard not sealed");
807
808 let mut ids_to_replace = ids_to_replace.clone();
809 let mut replaced_shards = BTreeMap::new();
810
811 if let Some(read_ts) = write_ts.step_back() {
819 let pred = |key: &migration_shard::Key| {
820 key.build_version == self.target_version
821 && key.deploy_generation == Some(self.deploy_generation)
822 };
823 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
824 for (key, shard_id) in entries {
825 let id = GlobalId::System(key.global_id);
826 if ids_to_replace.remove(&id) {
827 replaced_shards.insert(id, shard_id);
828 }
829 }
830
831 debug!(
832 %read_ts, ?replaced_shards, ?ids_to_replace,
833 "found existing entries in migration shard",
834 );
835 }
836
837 if ids_to_replace.is_empty() {
838 return Ok(Some(replaced_shards));
839 }
840 }
841
842 let mut updates = Vec::new();
846 for id in ids_to_replace {
847 let shard_id = ShardId::new();
848 replaced_shards.insert(id, shard_id);
849
850 let GlobalId::System(global_id) = id else {
851 bail!("attempt to migrate a non-system collection: {id}");
852 };
853 let key = migration_shard::Key {
854 global_id,
855 build_version: self.target_version.clone(),
856 deploy_generation: Some(self.deploy_generation),
857 };
858 updates.push(((key, shard_id), write_ts, 1));
859 }
860
861 let upper = Antichain::from_elem(write_ts);
862 let new_upper = Antichain::from_elem(write_ts.step_forward());
863 debug!(%write_ts, "attempting insert into migration shard");
864 let result = persist_write
865 .compare_and_append(updates, upper, new_upper)
866 .await
867 .expect("valid usage");
868
869 match result {
870 Ok(()) => {
871 debug!(
872 %write_ts, ?replaced_shards,
873 "successfully inserted into migration shard"
874 );
875 Ok(Some(replaced_shards))
876 }
877 Err(_mismatch) => Ok(None),
878 }
879 }
880
881 async fn open_migration_shard(
883 &self,
884 diagnostics: Diagnostics,
885 ) -> (
886 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
887 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
888 ) {
889 let persist = &self.config.persist_client;
890
891 persist
892 .open(
893 self.migration_shard,
894 Arc::new(migration_shard::KeySchema),
895 Arc::new(ShardIdSchema),
896 diagnostics,
897 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
898 )
899 .await
900 .expect("valid usage")
901 }
902
903 async fn open_migration_shard_since(
905 &self,
906 diagnostics: Diagnostics,
907 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
908 self.config
909 .persist_client
910 .open_critical_since(
911 self.migration_shard,
912 PersistClient::CONTROLLER_CRITICAL_SINCE,
915 Opaque::encode(&i64::MIN),
916 diagnostics.clone(),
917 )
918 .await
919 .expect("valid usage")
920 }
921
922 fn update_fingerprints(
927 &self,
928 migrated_items: &BTreeSet<SystemObjectDescription>,
929 ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
930 let mut new_fingerprints = BTreeMap::new();
931 for (object, object_info) in &self.system_objects {
932 let id = object_info.global_id;
933 let builtin = object_info.builtin;
934
935 let fingerprint = builtin.fingerprint();
936 if fingerprint == object_info.fingerprint {
937 continue; }
939
940 let migrated = migrated_items.contains(object);
942 let ephemeral = matches!(
944 builtin,
945 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
946 );
947
948 if migrated || ephemeral {
949 new_fingerprints.insert(object.clone(), fingerprint);
950 } else if builtin.runtime_alterable() {
951 assert_eq!(
954 object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
955 "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
956 );
957 } else {
958 panic!(
959 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
960 fingerprint, object_info.fingerprint,
961 );
962 }
963 }
964
965 Ok(new_fingerprints)
966 }
967
968 async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
985 let noop_action = async {}.boxed();
986 let noop_result = (BTreeSet::new(), noop_action);
987
988 if self.config.read_only {
989 return Ok(noop_result);
990 }
991
992 let diagnostics = Diagnostics {
993 shard_name: "builtin_migration".to_string(),
994 handle_purpose: "builtin schema migration cleanup".into(),
995 };
996 let (mut persist_write, mut persist_read) =
997 self.open_migration_shard(diagnostics.clone()).await;
998 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
999
1000 let upper = persist_write.fetch_recent_upper().await.clone();
1001 let write_ts = *upper.as_option().expect("migration shard not sealed");
1002 let Some(read_ts) = write_ts.step_back() else {
1003 return Ok(noop_result);
1004 };
1005
1006 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
1008 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
1009 else {
1010 return Ok(noop_result);
1011 };
1012
1013 debug!(
1014 ?stale_entries,
1015 "cleaning migration shard up to version {}", self.target_version,
1016 );
1017
1018 let current_shards: BTreeMap<_, _> = self
1019 .system_objects
1020 .values()
1021 .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1022 .collect();
1023
1024 let mut shards_to_finalize = BTreeSet::new();
1025 let mut retractions = Vec::new();
1026 for (key, shard_id) in stale_entries {
1027 let gid = GlobalId::System(key.global_id);
1031 if current_shards.get(&gid) != Some(&shard_id) {
1032 shards_to_finalize.insert(shard_id);
1033 }
1034
1035 retractions.push(((key, shard_id), write_ts, -1));
1036 }
1037
1038 let cleanup_action = async move {
1039 if !retractions.is_empty() {
1040 let new_upper = Antichain::from_elem(write_ts.step_forward());
1041 let result = persist_write
1042 .compare_and_append(retractions, upper, new_upper)
1043 .await
1044 .expect("valid usage");
1045 match result {
1046 Ok(()) => debug!("cleaned up migration shard"),
1047 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1048 }
1049 }
1050 }
1051 .boxed();
1052
1053 let o = persist_since.opaque().clone();
1055 let new_since = Antichain::from_elem(read_ts);
1056 let result = persist_since
1057 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1058 .await;
1059 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1060
1061 Ok((shards_to_finalize, cleanup_action))
1062 }
1063}
1064
1065async fn read_migration_shard<P>(
1071 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1072 read_ts: Timestamp,
1073 predicate: P,
1074) -> Option<Vec<(migration_shard::Key, ShardId)>>
1075where
1076 P: for<'a> Fn(&migration_shard::Key) -> bool,
1077{
1078 let as_of = Antichain::from_elem(read_ts);
1079 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1080
1081 assert!(
1082 updates.iter().all(|(_, _, diff)| *diff == 1),
1083 "migration shard contains invalid diffs: {updates:?}",
1084 );
1085
1086 let entries: Vec<_> = updates
1087 .into_iter()
1088 .map(|(data, _, _)| data)
1089 .filter(move |(key, _)| predicate(key))
1090 .collect();
1091
1092 (!entries.is_empty()).then_some(entries)
1093}
1094
1095#[derive(Debug, Default)]
1097struct Plan {
1098 evolve: Vec<SystemObjectDescription>,
1100 replace: Vec<SystemObjectDescription>,
1102}
1103
1104mod migration_shard {
1106 use std::fmt;
1107 use std::str::FromStr;
1108
1109 use arrow::array::{StringArray, StringBuilder};
1110 use bytes::{BufMut, Bytes};
1111 use mz_persist_types::Codec;
1112 use mz_persist_types::codec_impls::{
1113 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1114 };
1115 use mz_persist_types::columnar::Schema;
1116 use mz_persist_types::stats::NoneStats;
1117 use semver::Version;
1118 use serde::{Deserialize, Serialize};
1119
1120 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1121 pub(super) struct Key {
1122 pub(super) global_id: u64,
1123 pub(super) build_version: Version,
1124 pub(super) deploy_generation: Option<u64>,
1128 }
1129
1130 impl fmt::Display for Key {
1131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132 if self.deploy_generation.is_some() {
1133 let s = serde_json::to_string(self).expect("JSON serializable");
1135 f.write_str(&s)
1136 } else {
1137 write!(f, "{}-{}", self.global_id, self.build_version)
1139 }
1140 }
1141 }
1142
1143 impl FromStr for Key {
1144 type Err = String;
1145
1146 fn from_str(s: &str) -> Result<Self, String> {
1147 if let Ok(key) = serde_json::from_str(s) {
1149 return Ok(key);
1150 };
1151
1152 let parts: Vec<_> = s.splitn(2, '-').collect();
1154 let &[global_id, build_version] = parts.as_slice() else {
1155 return Err(format!("invalid Key '{s}'"));
1156 };
1157 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1158 let build_version = build_version
1159 .parse::<Version>()
1160 .map_err(|e| e.to_string())?;
1161 Ok(Key {
1162 global_id,
1163 build_version,
1164 deploy_generation: None,
1165 })
1166 }
1167 }
1168
1169 impl Default for Key {
1170 fn default() -> Self {
1171 Self {
1172 global_id: Default::default(),
1173 build_version: Version::new(0, 0, 0),
1174 deploy_generation: Some(0),
1175 }
1176 }
1177 }
1178
1179 impl Codec for Key {
1180 type Schema = KeySchema;
1181 type Storage = ();
1182
1183 fn codec_name() -> String {
1184 "TableKey".into()
1185 }
1186
1187 fn encode<B: BufMut>(&self, buf: &mut B) {
1188 buf.put(self.to_string().as_bytes())
1189 }
1190
1191 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1192 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1193 s.parse()
1194 }
1195
1196 fn encode_schema(_schema: &KeySchema) -> Bytes {
1197 Bytes::new()
1198 }
1199
1200 fn decode_schema(buf: &Bytes) -> Self::Schema {
1201 assert_eq!(*buf, Bytes::new());
1202 KeySchema
1203 }
1204 }
1205
1206 impl SimpleColumnarData for Key {
1207 type ArrowBuilder = StringBuilder;
1208 type ArrowColumn = StringArray;
1209
1210 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1211 builder.values_slice().len()
1212 }
1213
1214 fn push(&self, builder: &mut Self::ArrowBuilder) {
1215 builder.append_value(&self.to_string());
1216 }
1217
1218 fn push_null(builder: &mut Self::ArrowBuilder) {
1219 builder.append_null();
1220 }
1221
1222 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1223 *self = column.value(idx).parse().expect("valid Key");
1224 }
1225 }
1226
1227 #[derive(Debug, PartialEq)]
1228 pub(super) struct KeySchema;
1229
1230 impl Schema<Key> for KeySchema {
1231 type ArrowColumn = StringArray;
1232 type Statistics = NoneStats;
1233 type Decoder = SimpleColumnarDecoder<Key>;
1234 type Encoder = SimpleColumnarEncoder<Key>;
1235
1236 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1237 Ok(SimpleColumnarEncoder::default())
1238 }
1239
1240 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1241 Ok(SimpleColumnarDecoder::new(col))
1242 }
1243 }
1244}
1245
1246#[cfg(test)]
1247#[path = "builtin_schema_migration_tests.rs"]
1248mod tests;