1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40 MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD,
41 MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
42};
43use mz_catalog::config::BuiltinItemMigrationConfig;
44use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
45use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
46use mz_catalog::memory::error::{Error, ErrorKind};
47use mz_ore::soft_assert_or_log;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
49use mz_persist_client::critical::{Opaque, SinceHandle};
50use mz_persist_client::read::ReadHandle;
51use mz_persist_client::schema::CaESchema;
52use mz_persist_client::write::WriteHandle;
53use mz_persist_client::{Diagnostics, PersistClient};
54use mz_persist_types::ShardId;
55use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
56use mz_persist_types::schema::backward_compatible;
57use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
58use mz_repr::{CatalogItemId, GlobalId, Timestamp};
59use mz_sql::catalog::{CatalogItemType, NameReference};
60use mz_storage_client::controller::StorageTxn;
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use semver::Version;
64use timely::progress::Antichain;
65use tracing::{debug, info};
66
67use crate::catalog::migrate::get_migration_version;
68
69static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
85 vec![
86 MigrationStep::replacement(
87 "0.149.0",
88 CatalogItemType::Source,
89 MZ_INTERNAL_SCHEMA,
90 "mz_sink_statistics_raw",
91 ),
92 MigrationStep::replacement(
93 "0.149.0",
94 CatalogItemType::Source,
95 MZ_INTERNAL_SCHEMA,
96 "mz_source_statistics_raw",
97 ),
98 MigrationStep::evolution(
99 "0.159.0",
100 CatalogItemType::Source,
101 MZ_INTERNAL_SCHEMA,
102 "mz_cluster_replica_metrics_history",
103 ),
104 MigrationStep::replacement(
105 "0.160.0",
106 CatalogItemType::Table,
107 MZ_CATALOG_SCHEMA,
108 "mz_sinks",
109 ),
110 MigrationStep::replacement(
111 "26.18.0-dev.0",
112 CatalogItemType::MaterializedView,
113 MZ_CATALOG_SCHEMA,
114 "mz_databases",
115 ),
116 MigrationStep::replacement(
117 "26.19.0-dev.0",
118 CatalogItemType::MaterializedView,
119 MZ_CATALOG_SCHEMA,
120 "mz_schemas",
121 ),
122 MigrationStep::replacement(
123 "26.19.0-dev.0",
124 CatalogItemType::MaterializedView,
125 MZ_CATALOG_SCHEMA,
126 "mz_role_members",
127 ),
128 MigrationStep::replacement(
129 "26.19.0-dev.0",
130 CatalogItemType::MaterializedView,
131 MZ_INTERNAL_SCHEMA,
132 "mz_network_policies",
133 ),
134 MigrationStep::replacement(
135 "26.19.0-dev.0",
136 CatalogItemType::MaterializedView,
137 MZ_INTERNAL_SCHEMA,
138 "mz_network_policy_rules",
139 ),
140 MigrationStep::replacement(
141 "26.19.0-dev.0",
142 CatalogItemType::MaterializedView,
143 MZ_INTERNAL_SCHEMA,
144 "mz_cluster_workload_classes",
145 ),
146 MigrationStep::replacement(
147 "26.19.0-dev.0",
148 CatalogItemType::MaterializedView,
149 MZ_INTERNAL_SCHEMA,
150 "mz_internal_cluster_replicas",
151 ),
152 MigrationStep::replacement(
153 "26.19.0-dev.0",
154 CatalogItemType::MaterializedView,
155 MZ_INTERNAL_SCHEMA,
156 "mz_pending_cluster_replicas",
157 ),
158 MigrationStep::replacement(
159 "26.20.0-dev.0",
160 CatalogItemType::MaterializedView,
161 MZ_CATALOG_SCHEMA,
162 "mz_materialized_views",
163 ),
164 MigrationStep::replacement(
165 "26.22.0-dev.0",
166 CatalogItemType::MaterializedView,
167 MZ_CATALOG_SCHEMA,
168 "mz_connections",
169 ),
170 MigrationStep::replacement(
171 "26.22.0-dev.0",
172 CatalogItemType::MaterializedView,
173 MZ_CATALOG_SCHEMA,
174 "mz_secrets",
175 ),
176 MigrationStep::replacement(
177 "26.27.0-dev.0",
178 CatalogItemType::MaterializedView,
179 MZ_CATALOG_SCHEMA,
180 "mz_sources",
181 ),
182 MigrationStep::replacement(
183 "26.29.0-dev.0",
184 CatalogItemType::MaterializedView,
185 MZ_CATALOG_SCHEMA,
186 "mz_indexes",
187 ),
188 MigrationStep::replacement(
189 "26.29.0-dev.0",
190 CatalogItemType::MaterializedView,
191 MZ_CATALOG_SCHEMA,
192 "mz_roles",
193 ),
194 MigrationStep::replacement(
195 "26.29.0-dev.0",
196 CatalogItemType::MaterializedView,
197 MZ_CATALOG_SCHEMA,
198 "mz_role_parameters",
199 ),
200 MigrationStep::replacement(
205 "26.30.0-dev.0",
206 CatalogItemType::MaterializedView,
207 MZ_CATALOG_SCHEMA,
208 "mz_indexes",
209 ),
210 MigrationStep::replacement(
211 "26.30.0-dev.0",
212 CatalogItemType::MaterializedView,
213 MZ_CATALOG_SCHEMA,
214 "mz_clusters",
215 ),
216 MigrationStep::replacement(
217 "26.30.0-dev.0",
218 CatalogItemType::MaterializedView,
219 MZ_CATALOG_SCHEMA,
220 "mz_cluster_replicas",
221 ),
222 MigrationStep::replacement(
223 "26.30.0-dev.0",
224 CatalogItemType::MaterializedView,
225 MZ_INTERNAL_SCHEMA,
226 "mz_cluster_schedules",
227 ),
228 MigrationStep::replacement(
229 "26.30.0-dev.0",
230 CatalogItemType::MaterializedView,
231 MZ_CATALOG_SCHEMA,
232 "mz_default_privileges",
233 ),
234 MigrationStep::replacement(
235 "26.30.0-dev.0",
236 CatalogItemType::MaterializedView,
237 MZ_CATALOG_SCHEMA,
238 "mz_system_privileges",
239 ),
240 ]
241});
242
243#[derive(Clone, Debug)]
245struct MigrationStep {
246 version: Version,
248 object: SystemObjectDescription,
250 mechanism: Mechanism,
252}
253
254impl MigrationStep {
255 fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
257 Self {
258 version: Version::parse(version).expect("valid"),
259 object: SystemObjectDescription {
260 schema_name: schema.into(),
261 object_type: type_,
262 object_name: name.into(),
263 },
264 mechanism: Mechanism::Evolution,
265 }
266 }
267
268 fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
270 Self {
271 version: Version::parse(version).expect("valid"),
272 object: SystemObjectDescription {
273 schema_name: schema.into(),
274 object_type: type_,
275 object_name: name.into(),
276 },
277 mechanism: Mechanism::Replacement,
278 }
279 }
280}
281
282#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
284#[allow(dead_code)]
285enum Mechanism {
286 Evolution,
291 Replacement,
295}
296
297pub(super) struct MigrationResult {
299 pub replaced_items: BTreeSet<CatalogItemId>,
301 pub cleanup_action: BoxFuture<'static, ()>,
303}
304
305impl Default for MigrationResult {
306 fn default() -> Self {
307 Self {
308 replaced_items: Default::default(),
309 cleanup_action: async {}.boxed(),
310 }
311 }
312}
313
314pub(super) async fn run(
320 build_info: &BuildInfo,
321 deploy_generation: u64,
322 txn: &mut Transaction<'_>,
323 config: BuiltinItemMigrationConfig,
324) -> Result<MigrationResult, Error> {
325 assert_eq!(config.read_only, txn.is_savepoint());
327
328 if *build_info == DUMMY_BUILD_INFO {
331 return Ok(MigrationResult::default());
332 }
333
334 let Some(durable_version) = get_migration_version(txn) else {
335 return Ok(MigrationResult::default());
337 };
338 let build_version = build_info.semver_version();
339
340 let collection_metadata = txn.get_collection_metadata();
341 let system_objects = txn
342 .get_system_object_mappings()
343 .map(|m| {
344 let object = m.description;
345 let global_id = m.unique_identifier.global_id;
346 let shard_id = collection_metadata.get(&global_id).copied();
347 let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
348 panic!("missing builtin {object:?}");
349 };
350 let info = ObjectInfo {
351 global_id,
352 shard_id,
353 builtin,
354 fingerprint: m.unique_identifier.fingerprint,
355 };
356 (object, info)
357 })
358 .collect();
359
360 let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
361
362 let migration = Migration {
363 source_version: durable_version.clone(),
364 target_version: build_version.clone(),
365 deploy_generation,
366 system_objects,
367 migration_shard,
368 config,
369 };
370
371 let result = migration.run(&MIGRATIONS).await.map_err(|e| {
372 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
373 last_seen_version: durable_version.to_string(),
374 this_version: build_version.to_string(),
375 cause: e.to_string(),
376 })
377 })?;
378
379 result.apply(txn);
380
381 let replaced_items = txn
382 .get_system_object_mappings()
383 .map(|m| m.unique_identifier)
384 .filter(|ids| result.new_shards.contains_key(&ids.global_id))
385 .map(|ids| ids.catalog_id)
386 .collect();
387
388 Ok(MigrationResult {
389 replaced_items,
390 cleanup_action: result.cleanup_action,
391 })
392}
393
394struct MigrationRunResult {
396 new_shards: BTreeMap<GlobalId, ShardId>,
397 new_fingerprints: BTreeMap<SystemObjectDescription, String>,
398 shards_to_finalize: BTreeSet<ShardId>,
399 cleanup_action: BoxFuture<'static, ()>,
400}
401
402impl Default for MigrationRunResult {
403 fn default() -> Self {
404 Self {
405 new_shards: BTreeMap::new(),
406 new_fingerprints: BTreeMap::new(),
407 shards_to_finalize: BTreeSet::new(),
408 cleanup_action: async {}.boxed(),
409 }
410 }
411}
412
413impl MigrationRunResult {
414 fn apply(&self, txn: &mut Transaction<'_>) {
416 let replaced_ids = self.new_shards.keys().copied().collect();
418 let old_metadata = txn.delete_collection_metadata(replaced_ids);
419 txn.insert_collection_metadata(self.new_shards.clone())
420 .expect("inserting unique shards IDs after deleting existing entries");
421
422 let mut unfinalized_shards: BTreeSet<_> =
424 old_metadata.into_iter().map(|(_, sid)| sid).collect();
425 unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
426 txn.insert_unfinalized_shards(unfinalized_shards)
427 .expect("cannot fail");
428
429 let mappings = txn
431 .get_system_object_mappings()
432 .filter_map(|m| {
433 let fingerprint = self.new_fingerprints.get(&m.description)?;
434 Some(SystemObjectMapping {
435 description: m.description,
436 unique_identifier: SystemObjectUniqueIdentifier {
437 catalog_id: m.unique_identifier.catalog_id,
438 global_id: m.unique_identifier.global_id,
439 fingerprint: fingerprint.clone(),
440 },
441 })
442 })
443 .collect();
444 txn.set_system_object_mappings(mappings)
445 .expect("filtered existing mappings remain unique");
446 }
447}
448
449#[derive(Clone, Debug)]
451struct ObjectInfo {
452 global_id: GlobalId,
453 shard_id: Option<ShardId>,
454 builtin: &'static Builtin<NameReference>,
455 fingerprint: String,
456}
457
458struct Migration {
460 source_version: Version,
465 target_version: Version,
469 deploy_generation: u64,
471 system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
473 migration_shard: ShardId,
475 config: BuiltinItemMigrationConfig,
477}
478
479impl Migration {
480 async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
481 info!(
482 deploy_generation = %self.deploy_generation,
483 "running builtin schema migration: {} -> {}",
484 self.source_version, self.target_version
485 );
486
487 self.validate_migration_steps(steps);
488
489 let force_migration = if self.source_version != self.target_version
492 && self.source_version.pre.as_str().starts_with("dev")
493 && self.config.force_migration.is_none()
494 {
495 Some("evolution".to_string())
496 } else {
497 self.config.force_migration.clone()
498 };
499
500 let (force, plan) = match force_migration.as_deref() {
501 None => (false, self.plan_migration(steps)),
502 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
503 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
504 Some(other) => panic!("unknown force migration mechanism: {other}"),
505 };
506
507 if self.source_version == self.target_version && !force {
508 info!("skipping migration: already at target version");
509 return Ok(MigrationRunResult::default());
510 } else if self.source_version > self.target_version {
511 bail!("downgrade not supported");
512 }
513
514 if !self.config.read_only {
517 self.upgrade_migration_shard_version().await;
518 }
519
520 info!("executing migration plan: {plan:?}");
521
522 self.migrate_evolve(&plan.evolve).await?;
523 let new_shards = self.migrate_replace(&plan.replace).await?;
524
525 let mut migrated_objects = BTreeSet::new();
526 migrated_objects.extend(plan.evolve);
527 migrated_objects.extend(plan.replace);
528
529 let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
530
531 let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
532
533 Ok(MigrationRunResult {
534 new_shards,
535 new_fingerprints,
536 shards_to_finalize,
537 cleanup_action,
538 })
539 }
540
541 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
545 for step in steps {
546 assert!(
547 step.version <= self.target_version,
548 "migration step version greater than target version: {} > {}",
549 step.version,
550 self.target_version,
551 );
552
553 let object = &step.object;
554
555 assert_ne!(
563 &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
564 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
565 );
566
567 assert_ne!(
571 &*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object,
572 "mz_object_arrangement_size_history cannot be migrated or else the table will be truncated"
573 );
574
575 assert_ne!(
578 &*MZ_CATALOG_RAW_DESCRIPTION, object,
579 "mz_catalog_raw cannot be migrated"
580 );
581
582 let Some(object_info) = self.system_objects.get(object) else {
583 panic!("migration step for non-existent builtin: {object:?}");
584 };
585
586 let builtin = object_info.builtin;
587 use Builtin::*;
588 assert!(
589 matches!(builtin, Table(..) | Source(..) | MaterializedView(..)),
590 "schema migration not supported for builtin: {builtin:?}",
591 );
592 }
593 }
594
595 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
597 let steps = steps.iter().filter(|s| s.version > self.source_version);
599
600 let mut by_object = BTreeMap::new();
604 for step in steps {
605 if let Some(entry) = by_object.get_mut(&step.object) {
606 *entry = match (step.mechanism, *entry) {
607 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
608 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
609 Mechanism::Replacement
610 }
611 };
612 } else {
613 by_object.insert(step.object.clone(), step.mechanism);
614 }
615 }
616
617 let mut plan = Plan::default();
618 for (object, mechanism) in by_object {
619 match mechanism {
620 Mechanism::Evolution => plan.evolve.push(object),
621 Mechanism::Replacement => plan.replace.push(object),
622 }
623 }
624
625 plan
626 }
627
628 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
630 let objects = self
631 .system_objects
632 .iter()
633 .filter(|(_, info)| info.shard_id.is_some())
637 .filter(|(_, info)| {
638 use Builtin::*;
639 match info.builtin {
640 Table(table) => **table != *MZ_STORAGE_USAGE_BY_SHARD,
643 MaterializedView(..) => true,
644 Source(source) => **source != *MZ_CATALOG_RAW,
645 Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
646 }
647 })
648 .map(|(object, _)| object.clone())
649 .collect();
650
651 let mut plan = Plan::default();
652 match mechanism {
653 Mechanism::Evolution => plan.evolve = objects,
654 Mechanism::Replacement => plan.replace = objects,
655 }
656
657 plan
658 }
659
660 async fn upgrade_migration_shard_version(&self) {
662 let persist = &self.config.persist_client;
663 let diagnostics = Diagnostics {
664 shard_name: "builtin_migration".to_string(),
665 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
666 };
667
668 persist
669 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
670 self.migration_shard,
671 diagnostics,
672 )
673 .await
674 .expect("valid usage");
675 }
676
677 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
679 for object in objects {
680 self.migrate_evolve_one(object).await?;
681 }
682 Ok(())
683 }
684
685 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
686 let persist = &self.config.persist_client;
687
688 let Some(object_info) = self.system_objects.get(object) else {
689 bail!("missing builtin {object:?}");
690 };
691 let id = object_info.global_id;
692
693 let Some(shard_id) = object_info.shard_id else {
694 if self.config.read_only {
699 bail!("missing shard ID for builtin {object:?} ({id})");
700 } else {
701 return Ok(());
702 }
703 };
704
705 let target_desc = match object_info.builtin {
706 Builtin::Table(table) => &table.desc,
707 Builtin::Source(source) => &source.desc,
708 Builtin::MaterializedView(mv) => &mv.desc,
709 _ => bail!("not a storage collection: {object:?}"),
710 };
711
712 let diagnostics = Diagnostics {
713 shard_name: id.to_string(),
714 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
715 };
716 let source_schema = persist
717 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
718 .await
719 .expect("valid usage");
720
721 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
722
723 if self.config.read_only {
724 if let Some((_, source_desc, _)) = &source_schema {
727 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
728 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
729 if backward_compatible(&old, &new).is_none() {
730 bail!(
731 "incompatible schema evolution for {object:?}: \
732 {source_desc:?} -> {target_desc:?}"
733 );
734 }
735 }
736
737 return Ok(());
738 }
739
740 let (mut schema_id, mut source_desc) = match source_schema {
741 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
742 None => {
743 debug!(%id, %shard_id, "no previous schema found; registering initial one");
748 let schema_id = persist
749 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
750 shard_id,
751 target_desc,
752 &UnitSchema,
753 diagnostics.clone(),
754 )
755 .await
756 .expect("valid usage");
757 if schema_id.is_some() {
758 return Ok(());
759 }
760
761 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
762 let (schema_id, source_desc, _) = persist
763 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
764 shard_id,
765 diagnostics.clone(),
766 )
767 .await
768 .expect("valid usage")
769 .expect("known to exist");
770
771 (schema_id, source_desc)
772 }
773 };
774
775 loop {
776 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
781 let result = persist
782 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
783 shard_id,
784 schema_id,
785 target_desc,
786 &UnitSchema,
787 diagnostics.clone(),
788 )
789 .await
790 .expect("valid usage");
791
792 match result {
793 CaESchema::Ok(schema_id) => {
794 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
795 break;
796 }
797 CaESchema::Incompatible => bail!(
798 "incompatible schema evolution for {object:?}: \
799 {source_desc:?} -> {target_desc:?}"
800 ),
801 CaESchema::ExpectedMismatch {
802 schema_id: new_id,
803 key,
804 val: UnitSchema,
805 } => {
806 schema_id = new_id;
807 source_desc = key;
808 }
809 }
810 }
811
812 Ok(())
813 }
814
815 async fn migrate_replace(
817 &self,
818 objects: &[SystemObjectDescription],
819 ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
820 if objects.is_empty() {
821 return Ok(Default::default());
822 }
823
824 let diagnostics = Diagnostics {
825 shard_name: "builtin_migration".to_string(),
826 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
827 };
828 let (mut persist_write, mut persist_read) =
829 self.open_migration_shard(diagnostics.clone()).await;
830
831 let mut ids_to_replace = BTreeSet::new();
832 for object in objects {
833 if let Some(info) = self.system_objects.get(object) {
834 ids_to_replace.insert(info.global_id);
835 } else {
836 bail!("missing id for builtin {object:?}");
837 }
838 }
839
840 info!(?objects, ?ids_to_replace, "migrating by replacement");
841
842 let replaced_shards = loop {
845 if let Some(shards) = self
846 .try_get_or_insert_replacement_shards(
847 &ids_to_replace,
848 &mut persist_write,
849 &mut persist_read,
850 )
851 .await?
852 {
853 break shards;
854 }
855 };
856
857 Ok(replaced_shards)
858 }
859
860 async fn try_get_or_insert_replacement_shards(
870 &self,
871 ids_to_replace: &BTreeSet<GlobalId>,
872 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
873 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
874 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
875 let upper = persist_write.fetch_recent_upper().await;
876 let write_ts = *upper.as_option().expect("migration shard not sealed");
877
878 let mut ids_to_replace = ids_to_replace.clone();
879 let mut replaced_shards = BTreeMap::new();
880
881 if let Some(read_ts) = write_ts.step_back() {
889 let pred = |key: &migration_shard::Key| {
890 key.build_version == self.target_version
891 && key.deploy_generation == Some(self.deploy_generation)
892 };
893 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
894 for (key, shard_id) in entries {
895 let id = GlobalId::System(key.global_id);
896 if ids_to_replace.remove(&id) {
897 replaced_shards.insert(id, shard_id);
898 }
899 }
900
901 debug!(
902 %read_ts, ?replaced_shards, ?ids_to_replace,
903 "found existing entries in migration shard",
904 );
905 }
906
907 if ids_to_replace.is_empty() {
908 return Ok(Some(replaced_shards));
909 }
910 }
911
912 let mut updates = Vec::new();
916 for id in ids_to_replace {
917 let shard_id = ShardId::new();
918 replaced_shards.insert(id, shard_id);
919
920 let GlobalId::System(global_id) = id else {
921 bail!("attempt to migrate a non-system collection: {id}");
922 };
923 let key = migration_shard::Key {
924 global_id,
925 build_version: self.target_version.clone(),
926 deploy_generation: Some(self.deploy_generation),
927 };
928 updates.push(((key, shard_id), write_ts, 1));
929 }
930
931 let upper = Antichain::from_elem(write_ts);
932 let new_upper = Antichain::from_elem(write_ts.step_forward());
933 debug!(%write_ts, "attempting insert into migration shard");
934 let result = persist_write
935 .compare_and_append(updates, upper, new_upper)
936 .await
937 .expect("valid usage");
938
939 match result {
940 Ok(()) => {
941 debug!(
942 %write_ts, ?replaced_shards,
943 "successfully inserted into migration shard"
944 );
945 Ok(Some(replaced_shards))
946 }
947 Err(_mismatch) => Ok(None),
948 }
949 }
950
951 async fn open_migration_shard(
953 &self,
954 diagnostics: Diagnostics,
955 ) -> (
956 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
957 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
958 ) {
959 let persist = &self.config.persist_client;
960
961 persist
962 .open(
963 self.migration_shard,
964 Arc::new(migration_shard::KeySchema),
965 Arc::new(ShardIdSchema),
966 diagnostics,
967 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
968 )
969 .await
970 .expect("valid usage")
971 }
972
973 async fn open_migration_shard_since(
975 &self,
976 diagnostics: Diagnostics,
977 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
978 self.config
979 .persist_client
980 .open_critical_since(
981 self.migration_shard,
982 PersistClient::CONTROLLER_CRITICAL_SINCE,
985 Opaque::encode(&i64::MIN),
986 diagnostics.clone(),
987 )
988 .await
989 .expect("valid usage")
990 }
991
992 fn update_fingerprints(
997 &self,
998 migrated_items: &BTreeSet<SystemObjectDescription>,
999 ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
1000 let mut new_fingerprints = BTreeMap::new();
1001 for (object, object_info) in &self.system_objects {
1002 let id = object_info.global_id;
1003 let builtin = object_info.builtin;
1004
1005 let fingerprint = builtin.fingerprint();
1006 if fingerprint == object_info.fingerprint {
1007 continue; }
1009
1010 let migrated = migrated_items.contains(object);
1012 let ephemeral = matches!(
1014 builtin,
1015 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
1016 );
1017
1018 if migrated || ephemeral {
1019 new_fingerprints.insert(object.clone(), fingerprint);
1020 } else if builtin.runtime_alterable() {
1021 assert_eq!(
1024 object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
1025 "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
1026 );
1027 } else {
1028 panic!(
1029 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
1030 fingerprint, object_info.fingerprint,
1031 );
1032 }
1033 }
1034
1035 Ok(new_fingerprints)
1036 }
1037
1038 async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
1055 let noop_action = async {}.boxed();
1056 let noop_result = (BTreeSet::new(), noop_action);
1057
1058 if self.config.read_only {
1059 return Ok(noop_result);
1060 }
1061
1062 let diagnostics = Diagnostics {
1063 shard_name: "builtin_migration".to_string(),
1064 handle_purpose: "builtin schema migration cleanup".into(),
1065 };
1066 let (mut persist_write, mut persist_read) =
1067 self.open_migration_shard(diagnostics.clone()).await;
1068 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
1069
1070 let upper = persist_write.fetch_recent_upper().await.clone();
1071 let write_ts = *upper.as_option().expect("migration shard not sealed");
1072 let Some(read_ts) = write_ts.step_back() else {
1073 return Ok(noop_result);
1074 };
1075
1076 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
1078 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
1079 else {
1080 return Ok(noop_result);
1081 };
1082
1083 debug!(
1084 ?stale_entries,
1085 "cleaning migration shard up to version {}", self.target_version,
1086 );
1087
1088 let current_shards: BTreeMap<_, _> = self
1089 .system_objects
1090 .values()
1091 .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1092 .collect();
1093
1094 let mut shards_to_finalize = BTreeSet::new();
1095 let mut retractions = Vec::new();
1096 for (key, shard_id) in stale_entries {
1097 let gid = GlobalId::System(key.global_id);
1101 if current_shards.get(&gid) != Some(&shard_id) {
1102 shards_to_finalize.insert(shard_id);
1103 }
1104
1105 retractions.push(((key, shard_id), write_ts, -1));
1106 }
1107
1108 let cleanup_action = async move {
1109 if !retractions.is_empty() {
1110 let new_upper = Antichain::from_elem(write_ts.step_forward());
1111 let result = persist_write
1112 .compare_and_append(retractions, upper, new_upper)
1113 .await
1114 .expect("valid usage");
1115 match result {
1116 Ok(()) => debug!("cleaned up migration shard"),
1117 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1118 }
1119 }
1120 }
1121 .boxed();
1122
1123 let o = persist_since.opaque().clone();
1125 let new_since = Antichain::from_elem(read_ts);
1126 let result = persist_since
1127 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1128 .await;
1129 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1130
1131 Ok((shards_to_finalize, cleanup_action))
1132 }
1133}
1134
1135async fn read_migration_shard<P>(
1141 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1142 read_ts: Timestamp,
1143 predicate: P,
1144) -> Option<Vec<(migration_shard::Key, ShardId)>>
1145where
1146 P: for<'a> Fn(&migration_shard::Key) -> bool,
1147{
1148 let as_of = Antichain::from_elem(read_ts);
1149 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1150
1151 assert!(
1152 updates.iter().all(|(_, _, diff)| *diff == 1),
1153 "migration shard contains invalid diffs: {updates:?}",
1154 );
1155
1156 let entries: Vec<_> = updates
1157 .into_iter()
1158 .map(|(data, _, _)| data)
1159 .filter(move |(key, _)| predicate(key))
1160 .collect();
1161
1162 (!entries.is_empty()).then_some(entries)
1163}
1164
1165#[derive(Debug, Default)]
1167struct Plan {
1168 evolve: Vec<SystemObjectDescription>,
1170 replace: Vec<SystemObjectDescription>,
1172}
1173
1174mod migration_shard {
1176 use std::fmt;
1177 use std::str::FromStr;
1178
1179 use arrow::array::{StringArray, StringBuilder};
1180 use bytes::{BufMut, Bytes};
1181 use mz_persist_types::Codec;
1182 use mz_persist_types::codec_impls::{
1183 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1184 };
1185 use mz_persist_types::columnar::Schema;
1186 use mz_persist_types::stats::NoneStats;
1187 use semver::Version;
1188 use serde::{Deserialize, Serialize};
1189
1190 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1191 pub(super) struct Key {
1192 pub(super) global_id: u64,
1193 pub(super) build_version: Version,
1194 pub(super) deploy_generation: Option<u64>,
1198 }
1199
1200 impl fmt::Display for Key {
1201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1202 if self.deploy_generation.is_some() {
1203 let s = serde_json::to_string(self).expect("JSON serializable");
1205 f.write_str(&s)
1206 } else {
1207 write!(f, "{}-{}", self.global_id, self.build_version)
1209 }
1210 }
1211 }
1212
1213 impl FromStr for Key {
1214 type Err = String;
1215
1216 fn from_str(s: &str) -> Result<Self, String> {
1217 if let Ok(key) = serde_json::from_str(s) {
1219 return Ok(key);
1220 };
1221
1222 let parts: Vec<_> = s.splitn(2, '-').collect();
1224 let &[global_id, build_version] = parts.as_slice() else {
1225 return Err(format!("invalid Key '{s}'"));
1226 };
1227 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1228 let build_version = build_version
1229 .parse::<Version>()
1230 .map_err(|e| e.to_string())?;
1231 Ok(Key {
1232 global_id,
1233 build_version,
1234 deploy_generation: None,
1235 })
1236 }
1237 }
1238
1239 impl Default for Key {
1240 fn default() -> Self {
1241 Self {
1242 global_id: Default::default(),
1243 build_version: Version::new(0, 0, 0),
1244 deploy_generation: Some(0),
1245 }
1246 }
1247 }
1248
1249 impl Codec for Key {
1250 type Schema = KeySchema;
1251 type Storage = ();
1252
1253 fn codec_name() -> String {
1254 "TableKey".into()
1255 }
1256
1257 fn encode<B: BufMut>(&self, buf: &mut B) {
1258 buf.put(self.to_string().as_bytes())
1259 }
1260
1261 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1262 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1263 s.parse()
1264 }
1265
1266 fn encode_schema(_schema: &KeySchema) -> Bytes {
1267 Bytes::new()
1268 }
1269
1270 fn decode_schema(buf: &Bytes) -> Self::Schema {
1271 assert_eq!(*buf, Bytes::new());
1272 KeySchema
1273 }
1274 }
1275
1276 impl SimpleColumnarData for Key {
1277 type ArrowBuilder = StringBuilder;
1278 type ArrowColumn = StringArray;
1279
1280 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1281 builder.values_slice().len()
1282 }
1283
1284 fn push(&self, builder: &mut Self::ArrowBuilder) {
1285 builder.append_value(&self.to_string());
1286 }
1287
1288 fn push_null(builder: &mut Self::ArrowBuilder) {
1289 builder.append_null();
1290 }
1291
1292 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1293 *self = column.value(idx).parse().expect("valid Key");
1294 }
1295 }
1296
1297 #[derive(Debug, PartialEq)]
1298 pub(super) struct KeySchema;
1299
1300 impl Schema<Key> for KeySchema {
1301 type ArrowColumn = StringArray;
1302 type Statistics = NoneStats;
1303 type Decoder = SimpleColumnarDecoder<Key>;
1304 type Encoder = SimpleColumnarEncoder<Key>;
1305
1306 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1307 Ok(SimpleColumnarEncoder::default())
1308 }
1309
1310 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1311 Ok(SimpleColumnarDecoder::new(col))
1312 }
1313 }
1314}
1315
1316#[cfg(test)]
1317#[path = "builtin_schema_migration_tests.rs"]
1318mod tests;