1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40 MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD,
41 MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
42};
43use mz_catalog::config::BuiltinItemMigrationConfig;
44use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
45use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
46use mz_catalog::memory::error::{Error, ErrorKind};
47use mz_ore::soft_assert_or_log;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
49use mz_persist_client::critical::{Opaque, SinceHandle};
50use mz_persist_client::read::ReadHandle;
51use mz_persist_client::schema::CaESchema;
52use mz_persist_client::write::WriteHandle;
53use mz_persist_client::{Diagnostics, PersistClient};
54use mz_persist_types::ShardId;
55use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
56use mz_persist_types::schema::backward_compatible;
57use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
58use mz_repr::{CatalogItemId, GlobalId, Timestamp};
59use mz_sql::catalog::{CatalogItemType, NameReference};
60use mz_storage_client::controller::StorageTxn;
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use semver::Version;
64use timely::progress::Antichain;
65use tracing::{debug, info};
66
67use crate::catalog::migrate::get_migration_version;
68
69static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
77 vec![
78 MigrationStep::replacement(
79 "0.149.0",
80 CatalogItemType::Source,
81 MZ_INTERNAL_SCHEMA,
82 "mz_sink_statistics_raw",
83 ),
84 MigrationStep::replacement(
85 "0.149.0",
86 CatalogItemType::Source,
87 MZ_INTERNAL_SCHEMA,
88 "mz_source_statistics_raw",
89 ),
90 MigrationStep::evolution(
91 "0.159.0",
92 CatalogItemType::Source,
93 MZ_INTERNAL_SCHEMA,
94 "mz_cluster_replica_metrics_history",
95 ),
96 MigrationStep::replacement(
97 "0.160.0",
98 CatalogItemType::Table,
99 MZ_CATALOG_SCHEMA,
100 "mz_roles",
101 ),
102 MigrationStep::replacement(
103 "0.160.0",
104 CatalogItemType::Table,
105 MZ_CATALOG_SCHEMA,
106 "mz_sinks",
107 ),
108 MigrationStep::replacement(
109 "26.18.0-dev.0",
110 CatalogItemType::MaterializedView,
111 MZ_CATALOG_SCHEMA,
112 "mz_databases",
113 ),
114 MigrationStep::replacement(
115 "26.19.0-dev.0",
116 CatalogItemType::MaterializedView,
117 MZ_CATALOG_SCHEMA,
118 "mz_schemas",
119 ),
120 MigrationStep::replacement(
121 "26.19.0-dev.0",
122 CatalogItemType::MaterializedView,
123 MZ_CATALOG_SCHEMA,
124 "mz_role_members",
125 ),
126 MigrationStep::replacement(
127 "26.19.0-dev.0",
128 CatalogItemType::MaterializedView,
129 MZ_INTERNAL_SCHEMA,
130 "mz_network_policies",
131 ),
132 MigrationStep::replacement(
133 "26.19.0-dev.0",
134 CatalogItemType::MaterializedView,
135 MZ_INTERNAL_SCHEMA,
136 "mz_network_policy_rules",
137 ),
138 MigrationStep::replacement(
139 "26.19.0-dev.0",
140 CatalogItemType::MaterializedView,
141 MZ_INTERNAL_SCHEMA,
142 "mz_cluster_workload_classes",
143 ),
144 MigrationStep::replacement(
145 "26.19.0-dev.0",
146 CatalogItemType::MaterializedView,
147 MZ_INTERNAL_SCHEMA,
148 "mz_internal_cluster_replicas",
149 ),
150 MigrationStep::replacement(
151 "26.19.0-dev.0",
152 CatalogItemType::MaterializedView,
153 MZ_INTERNAL_SCHEMA,
154 "mz_pending_cluster_replicas",
155 ),
156 MigrationStep::replacement(
157 "26.20.0-dev.0",
158 CatalogItemType::MaterializedView,
159 MZ_CATALOG_SCHEMA,
160 "mz_materialized_views",
161 ),
162 MigrationStep::replacement(
163 "26.22.0-dev.0",
164 CatalogItemType::MaterializedView,
165 MZ_CATALOG_SCHEMA,
166 "mz_connections",
167 ),
168 MigrationStep::replacement(
169 "26.22.0-dev.0",
170 CatalogItemType::MaterializedView,
171 MZ_CATALOG_SCHEMA,
172 "mz_secrets",
173 ),
174 MigrationStep::replacement(
175 "26.27.0-dev.0",
176 CatalogItemType::MaterializedView,
177 MZ_CATALOG_SCHEMA,
178 "mz_sources",
179 ),
180 ]
181});
182
183#[derive(Clone, Debug)]
185struct MigrationStep {
186 version: Version,
188 object: SystemObjectDescription,
190 mechanism: Mechanism,
192}
193
194impl MigrationStep {
195 fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
197 Self {
198 version: Version::parse(version).expect("valid"),
199 object: SystemObjectDescription {
200 schema_name: schema.into(),
201 object_type: type_,
202 object_name: name.into(),
203 },
204 mechanism: Mechanism::Evolution,
205 }
206 }
207
208 fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
210 Self {
211 version: Version::parse(version).expect("valid"),
212 object: SystemObjectDescription {
213 schema_name: schema.into(),
214 object_type: type_,
215 object_name: name.into(),
216 },
217 mechanism: Mechanism::Replacement,
218 }
219 }
220}
221
222#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
224#[allow(dead_code)]
225enum Mechanism {
226 Evolution,
231 Replacement,
235}
236
237pub(super) struct MigrationResult {
239 pub replaced_items: BTreeSet<CatalogItemId>,
241 pub cleanup_action: BoxFuture<'static, ()>,
243}
244
245impl Default for MigrationResult {
246 fn default() -> Self {
247 Self {
248 replaced_items: Default::default(),
249 cleanup_action: async {}.boxed(),
250 }
251 }
252}
253
254pub(super) async fn run(
260 build_info: &BuildInfo,
261 deploy_generation: u64,
262 txn: &mut Transaction<'_>,
263 config: BuiltinItemMigrationConfig,
264) -> Result<MigrationResult, Error> {
265 assert_eq!(config.read_only, txn.is_savepoint());
267
268 if *build_info == DUMMY_BUILD_INFO {
271 return Ok(MigrationResult::default());
272 }
273
274 let Some(durable_version) = get_migration_version(txn) else {
275 return Ok(MigrationResult::default());
277 };
278 let build_version = build_info.semver_version();
279
280 let collection_metadata = txn.get_collection_metadata();
281 let system_objects = txn
282 .get_system_object_mappings()
283 .map(|m| {
284 let object = m.description;
285 let global_id = m.unique_identifier.global_id;
286 let shard_id = collection_metadata.get(&global_id).copied();
287 let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
288 panic!("missing builtin {object:?}");
289 };
290 let info = ObjectInfo {
291 global_id,
292 shard_id,
293 builtin,
294 fingerprint: m.unique_identifier.fingerprint,
295 };
296 (object, info)
297 })
298 .collect();
299
300 let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
301
302 let migration = Migration {
303 source_version: durable_version.clone(),
304 target_version: build_version.clone(),
305 deploy_generation,
306 system_objects,
307 migration_shard,
308 config,
309 };
310
311 let result = migration.run(&MIGRATIONS).await.map_err(|e| {
312 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
313 last_seen_version: durable_version.to_string(),
314 this_version: build_version.to_string(),
315 cause: e.to_string(),
316 })
317 })?;
318
319 result.apply(txn);
320
321 let replaced_items = txn
322 .get_system_object_mappings()
323 .map(|m| m.unique_identifier)
324 .filter(|ids| result.new_shards.contains_key(&ids.global_id))
325 .map(|ids| ids.catalog_id)
326 .collect();
327
328 Ok(MigrationResult {
329 replaced_items,
330 cleanup_action: result.cleanup_action,
331 })
332}
333
334struct MigrationRunResult {
336 new_shards: BTreeMap<GlobalId, ShardId>,
337 new_fingerprints: BTreeMap<SystemObjectDescription, String>,
338 shards_to_finalize: BTreeSet<ShardId>,
339 cleanup_action: BoxFuture<'static, ()>,
340}
341
342impl Default for MigrationRunResult {
343 fn default() -> Self {
344 Self {
345 new_shards: BTreeMap::new(),
346 new_fingerprints: BTreeMap::new(),
347 shards_to_finalize: BTreeSet::new(),
348 cleanup_action: async {}.boxed(),
349 }
350 }
351}
352
353impl MigrationRunResult {
354 fn apply(&self, txn: &mut Transaction<'_>) {
356 let replaced_ids = self.new_shards.keys().copied().collect();
358 let old_metadata = txn.delete_collection_metadata(replaced_ids);
359 txn.insert_collection_metadata(self.new_shards.clone())
360 .expect("inserting unique shards IDs after deleting existing entries");
361
362 let mut unfinalized_shards: BTreeSet<_> =
364 old_metadata.into_iter().map(|(_, sid)| sid).collect();
365 unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
366 txn.insert_unfinalized_shards(unfinalized_shards)
367 .expect("cannot fail");
368
369 let mappings = txn
371 .get_system_object_mappings()
372 .filter_map(|m| {
373 let fingerprint = self.new_fingerprints.get(&m.description)?;
374 Some(SystemObjectMapping {
375 description: m.description,
376 unique_identifier: SystemObjectUniqueIdentifier {
377 catalog_id: m.unique_identifier.catalog_id,
378 global_id: m.unique_identifier.global_id,
379 fingerprint: fingerprint.clone(),
380 },
381 })
382 })
383 .collect();
384 txn.set_system_object_mappings(mappings)
385 .expect("filtered existing mappings remain unique");
386 }
387}
388
389#[derive(Clone, Debug)]
391struct ObjectInfo {
392 global_id: GlobalId,
393 shard_id: Option<ShardId>,
394 builtin: &'static Builtin<NameReference>,
395 fingerprint: String,
396}
397
398struct Migration {
400 source_version: Version,
405 target_version: Version,
409 deploy_generation: u64,
411 system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
413 migration_shard: ShardId,
415 config: BuiltinItemMigrationConfig,
417}
418
419impl Migration {
420 async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
421 info!(
422 deploy_generation = %self.deploy_generation,
423 "running builtin schema migration: {} -> {}",
424 self.source_version, self.target_version
425 );
426
427 self.validate_migration_steps(steps);
428
429 let force_migration = if self.source_version != self.target_version
432 && self.source_version.pre.as_str().starts_with("dev")
433 && self.config.force_migration.is_none()
434 {
435 Some("evolution".to_string())
436 } else {
437 self.config.force_migration.clone()
438 };
439
440 let (force, plan) = match force_migration.as_deref() {
441 None => (false, self.plan_migration(steps)),
442 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
443 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
444 Some(other) => panic!("unknown force migration mechanism: {other}"),
445 };
446
447 if self.source_version == self.target_version && !force {
448 info!("skipping migration: already at target version");
449 return Ok(MigrationRunResult::default());
450 } else if self.source_version > self.target_version {
451 bail!("downgrade not supported");
452 }
453
454 if !self.config.read_only {
457 self.upgrade_migration_shard_version().await;
458 }
459
460 info!("executing migration plan: {plan:?}");
461
462 self.migrate_evolve(&plan.evolve).await?;
463 let new_shards = self.migrate_replace(&plan.replace).await?;
464
465 let mut migrated_objects = BTreeSet::new();
466 migrated_objects.extend(plan.evolve);
467 migrated_objects.extend(plan.replace);
468
469 let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
470
471 let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
472
473 Ok(MigrationRunResult {
474 new_shards,
475 new_fingerprints,
476 shards_to_finalize,
477 cleanup_action,
478 })
479 }
480
481 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
485 for step in steps {
486 assert!(
487 step.version <= self.target_version,
488 "migration step version greater than target version: {} > {}",
489 step.version,
490 self.target_version,
491 );
492
493 let object = &step.object;
494
495 assert_ne!(
503 &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
504 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
505 );
506
507 assert_ne!(
511 &*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object,
512 "mz_object_arrangement_size_history cannot be migrated or else the table will be truncated"
513 );
514
515 assert_ne!(
518 &*MZ_CATALOG_RAW_DESCRIPTION, object,
519 "mz_catalog_raw cannot be migrated"
520 );
521
522 let Some(object_info) = self.system_objects.get(object) else {
523 panic!("migration step for non-existent builtin: {object:?}");
524 };
525
526 let builtin = object_info.builtin;
527 use Builtin::*;
528 assert!(
529 matches!(builtin, Table(..) | Source(..) | MaterializedView(..)),
530 "schema migration not supported for builtin: {builtin:?}",
531 );
532 }
533 }
534
535 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
537 let steps = steps.iter().filter(|s| s.version > self.source_version);
539
540 let mut by_object = BTreeMap::new();
544 for step in steps {
545 if let Some(entry) = by_object.get_mut(&step.object) {
546 *entry = match (step.mechanism, *entry) {
547 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
548 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
549 Mechanism::Replacement
550 }
551 };
552 } else {
553 by_object.insert(step.object.clone(), step.mechanism);
554 }
555 }
556
557 let mut plan = Plan::default();
558 for (object, mechanism) in by_object {
559 match mechanism {
560 Mechanism::Evolution => plan.evolve.push(object),
561 Mechanism::Replacement => plan.replace.push(object),
562 }
563 }
564
565 plan
566 }
567
568 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
570 let objects = self
571 .system_objects
572 .iter()
573 .filter(|(_, info)| info.shard_id.is_some())
577 .filter(|(_, info)| {
578 use Builtin::*;
579 match info.builtin {
580 Table(table) => **table != *MZ_STORAGE_USAGE_BY_SHARD,
583 MaterializedView(..) => true,
584 Source(source) => **source != *MZ_CATALOG_RAW,
585 Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
586 }
587 })
588 .map(|(object, _)| object.clone())
589 .collect();
590
591 let mut plan = Plan::default();
592 match mechanism {
593 Mechanism::Evolution => plan.evolve = objects,
594 Mechanism::Replacement => plan.replace = objects,
595 }
596
597 plan
598 }
599
600 async fn upgrade_migration_shard_version(&self) {
602 let persist = &self.config.persist_client;
603 let diagnostics = Diagnostics {
604 shard_name: "builtin_migration".to_string(),
605 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
606 };
607
608 persist
609 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
610 self.migration_shard,
611 diagnostics,
612 )
613 .await
614 .expect("valid usage");
615 }
616
617 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
619 for object in objects {
620 self.migrate_evolve_one(object).await?;
621 }
622 Ok(())
623 }
624
625 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
626 let persist = &self.config.persist_client;
627
628 let Some(object_info) = self.system_objects.get(object) else {
629 bail!("missing builtin {object:?}");
630 };
631 let id = object_info.global_id;
632
633 let Some(shard_id) = object_info.shard_id else {
634 if self.config.read_only {
639 bail!("missing shard ID for builtin {object:?} ({id})");
640 } else {
641 return Ok(());
642 }
643 };
644
645 let target_desc = match object_info.builtin {
646 Builtin::Table(table) => &table.desc,
647 Builtin::Source(source) => &source.desc,
648 Builtin::MaterializedView(mv) => &mv.desc,
649 _ => bail!("not a storage collection: {object:?}"),
650 };
651
652 let diagnostics = Diagnostics {
653 shard_name: id.to_string(),
654 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
655 };
656 let source_schema = persist
657 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
658 .await
659 .expect("valid usage");
660
661 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
662
663 if self.config.read_only {
664 if let Some((_, source_desc, _)) = &source_schema {
667 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
668 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
669 if backward_compatible(&old, &new).is_none() {
670 bail!(
671 "incompatible schema evolution for {object:?}: \
672 {source_desc:?} -> {target_desc:?}"
673 );
674 }
675 }
676
677 return Ok(());
678 }
679
680 let (mut schema_id, mut source_desc) = match source_schema {
681 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
682 None => {
683 debug!(%id, %shard_id, "no previous schema found; registering initial one");
688 let schema_id = persist
689 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
690 shard_id,
691 target_desc,
692 &UnitSchema,
693 diagnostics.clone(),
694 )
695 .await
696 .expect("valid usage");
697 if schema_id.is_some() {
698 return Ok(());
699 }
700
701 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
702 let (schema_id, source_desc, _) = persist
703 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
704 shard_id,
705 diagnostics.clone(),
706 )
707 .await
708 .expect("valid usage")
709 .expect("known to exist");
710
711 (schema_id, source_desc)
712 }
713 };
714
715 loop {
716 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
721 let result = persist
722 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
723 shard_id,
724 schema_id,
725 target_desc,
726 &UnitSchema,
727 diagnostics.clone(),
728 )
729 .await
730 .expect("valid usage");
731
732 match result {
733 CaESchema::Ok(schema_id) => {
734 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
735 break;
736 }
737 CaESchema::Incompatible => bail!(
738 "incompatible schema evolution for {object:?}: \
739 {source_desc:?} -> {target_desc:?}"
740 ),
741 CaESchema::ExpectedMismatch {
742 schema_id: new_id,
743 key,
744 val: UnitSchema,
745 } => {
746 schema_id = new_id;
747 source_desc = key;
748 }
749 }
750 }
751
752 Ok(())
753 }
754
755 async fn migrate_replace(
757 &self,
758 objects: &[SystemObjectDescription],
759 ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
760 if objects.is_empty() {
761 return Ok(Default::default());
762 }
763
764 let diagnostics = Diagnostics {
765 shard_name: "builtin_migration".to_string(),
766 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
767 };
768 let (mut persist_write, mut persist_read) =
769 self.open_migration_shard(diagnostics.clone()).await;
770
771 let mut ids_to_replace = BTreeSet::new();
772 for object in objects {
773 if let Some(info) = self.system_objects.get(object) {
774 ids_to_replace.insert(info.global_id);
775 } else {
776 bail!("missing id for builtin {object:?}");
777 }
778 }
779
780 info!(?objects, ?ids_to_replace, "migrating by replacement");
781
782 let replaced_shards = loop {
785 if let Some(shards) = self
786 .try_get_or_insert_replacement_shards(
787 &ids_to_replace,
788 &mut persist_write,
789 &mut persist_read,
790 )
791 .await?
792 {
793 break shards;
794 }
795 };
796
797 Ok(replaced_shards)
798 }
799
800 async fn try_get_or_insert_replacement_shards(
810 &self,
811 ids_to_replace: &BTreeSet<GlobalId>,
812 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
813 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
814 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
815 let upper = persist_write.fetch_recent_upper().await;
816 let write_ts = *upper.as_option().expect("migration shard not sealed");
817
818 let mut ids_to_replace = ids_to_replace.clone();
819 let mut replaced_shards = BTreeMap::new();
820
821 if let Some(read_ts) = write_ts.step_back() {
829 let pred = |key: &migration_shard::Key| {
830 key.build_version == self.target_version
831 && key.deploy_generation == Some(self.deploy_generation)
832 };
833 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
834 for (key, shard_id) in entries {
835 let id = GlobalId::System(key.global_id);
836 if ids_to_replace.remove(&id) {
837 replaced_shards.insert(id, shard_id);
838 }
839 }
840
841 debug!(
842 %read_ts, ?replaced_shards, ?ids_to_replace,
843 "found existing entries in migration shard",
844 );
845 }
846
847 if ids_to_replace.is_empty() {
848 return Ok(Some(replaced_shards));
849 }
850 }
851
852 let mut updates = Vec::new();
856 for id in ids_to_replace {
857 let shard_id = ShardId::new();
858 replaced_shards.insert(id, shard_id);
859
860 let GlobalId::System(global_id) = id else {
861 bail!("attempt to migrate a non-system collection: {id}");
862 };
863 let key = migration_shard::Key {
864 global_id,
865 build_version: self.target_version.clone(),
866 deploy_generation: Some(self.deploy_generation),
867 };
868 updates.push(((key, shard_id), write_ts, 1));
869 }
870
871 let upper = Antichain::from_elem(write_ts);
872 let new_upper = Antichain::from_elem(write_ts.step_forward());
873 debug!(%write_ts, "attempting insert into migration shard");
874 let result = persist_write
875 .compare_and_append(updates, upper, new_upper)
876 .await
877 .expect("valid usage");
878
879 match result {
880 Ok(()) => {
881 debug!(
882 %write_ts, ?replaced_shards,
883 "successfully inserted into migration shard"
884 );
885 Ok(Some(replaced_shards))
886 }
887 Err(_mismatch) => Ok(None),
888 }
889 }
890
891 async fn open_migration_shard(
893 &self,
894 diagnostics: Diagnostics,
895 ) -> (
896 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
897 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
898 ) {
899 let persist = &self.config.persist_client;
900
901 persist
902 .open(
903 self.migration_shard,
904 Arc::new(migration_shard::KeySchema),
905 Arc::new(ShardIdSchema),
906 diagnostics,
907 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
908 )
909 .await
910 .expect("valid usage")
911 }
912
913 async fn open_migration_shard_since(
915 &self,
916 diagnostics: Diagnostics,
917 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
918 self.config
919 .persist_client
920 .open_critical_since(
921 self.migration_shard,
922 PersistClient::CONTROLLER_CRITICAL_SINCE,
925 Opaque::encode(&i64::MIN),
926 diagnostics.clone(),
927 )
928 .await
929 .expect("valid usage")
930 }
931
932 fn update_fingerprints(
937 &self,
938 migrated_items: &BTreeSet<SystemObjectDescription>,
939 ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
940 let mut new_fingerprints = BTreeMap::new();
941 for (object, object_info) in &self.system_objects {
942 let id = object_info.global_id;
943 let builtin = object_info.builtin;
944
945 let fingerprint = builtin.fingerprint();
946 if fingerprint == object_info.fingerprint {
947 continue; }
949
950 let migrated = migrated_items.contains(object);
952 let ephemeral = matches!(
954 builtin,
955 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
956 );
957
958 if migrated || ephemeral {
959 new_fingerprints.insert(object.clone(), fingerprint);
960 } else if builtin.runtime_alterable() {
961 assert_eq!(
964 object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
965 "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
966 );
967 } else {
968 panic!(
969 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
970 fingerprint, object_info.fingerprint,
971 );
972 }
973 }
974
975 Ok(new_fingerprints)
976 }
977
978 async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
995 let noop_action = async {}.boxed();
996 let noop_result = (BTreeSet::new(), noop_action);
997
998 if self.config.read_only {
999 return Ok(noop_result);
1000 }
1001
1002 let diagnostics = Diagnostics {
1003 shard_name: "builtin_migration".to_string(),
1004 handle_purpose: "builtin schema migration cleanup".into(),
1005 };
1006 let (mut persist_write, mut persist_read) =
1007 self.open_migration_shard(diagnostics.clone()).await;
1008 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
1009
1010 let upper = persist_write.fetch_recent_upper().await.clone();
1011 let write_ts = *upper.as_option().expect("migration shard not sealed");
1012 let Some(read_ts) = write_ts.step_back() else {
1013 return Ok(noop_result);
1014 };
1015
1016 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
1018 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
1019 else {
1020 return Ok(noop_result);
1021 };
1022
1023 debug!(
1024 ?stale_entries,
1025 "cleaning migration shard up to version {}", self.target_version,
1026 );
1027
1028 let current_shards: BTreeMap<_, _> = self
1029 .system_objects
1030 .values()
1031 .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1032 .collect();
1033
1034 let mut shards_to_finalize = BTreeSet::new();
1035 let mut retractions = Vec::new();
1036 for (key, shard_id) in stale_entries {
1037 let gid = GlobalId::System(key.global_id);
1041 if current_shards.get(&gid) != Some(&shard_id) {
1042 shards_to_finalize.insert(shard_id);
1043 }
1044
1045 retractions.push(((key, shard_id), write_ts, -1));
1046 }
1047
1048 let cleanup_action = async move {
1049 if !retractions.is_empty() {
1050 let new_upper = Antichain::from_elem(write_ts.step_forward());
1051 let result = persist_write
1052 .compare_and_append(retractions, upper, new_upper)
1053 .await
1054 .expect("valid usage");
1055 match result {
1056 Ok(()) => debug!("cleaned up migration shard"),
1057 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1058 }
1059 }
1060 }
1061 .boxed();
1062
1063 let o = persist_since.opaque().clone();
1065 let new_since = Antichain::from_elem(read_ts);
1066 let result = persist_since
1067 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1068 .await;
1069 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1070
1071 Ok((shards_to_finalize, cleanup_action))
1072 }
1073}
1074
1075async fn read_migration_shard<P>(
1081 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1082 read_ts: Timestamp,
1083 predicate: P,
1084) -> Option<Vec<(migration_shard::Key, ShardId)>>
1085where
1086 P: for<'a> Fn(&migration_shard::Key) -> bool,
1087{
1088 let as_of = Antichain::from_elem(read_ts);
1089 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1090
1091 assert!(
1092 updates.iter().all(|(_, _, diff)| *diff == 1),
1093 "migration shard contains invalid diffs: {updates:?}",
1094 );
1095
1096 let entries: Vec<_> = updates
1097 .into_iter()
1098 .map(|(data, _, _)| data)
1099 .filter(move |(key, _)| predicate(key))
1100 .collect();
1101
1102 (!entries.is_empty()).then_some(entries)
1103}
1104
1105#[derive(Debug, Default)]
1107struct Plan {
1108 evolve: Vec<SystemObjectDescription>,
1110 replace: Vec<SystemObjectDescription>,
1112}
1113
1114mod migration_shard {
1116 use std::fmt;
1117 use std::str::FromStr;
1118
1119 use arrow::array::{StringArray, StringBuilder};
1120 use bytes::{BufMut, Bytes};
1121 use mz_persist_types::Codec;
1122 use mz_persist_types::codec_impls::{
1123 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1124 };
1125 use mz_persist_types::columnar::Schema;
1126 use mz_persist_types::stats::NoneStats;
1127 use semver::Version;
1128 use serde::{Deserialize, Serialize};
1129
1130 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1131 pub(super) struct Key {
1132 pub(super) global_id: u64,
1133 pub(super) build_version: Version,
1134 pub(super) deploy_generation: Option<u64>,
1138 }
1139
1140 impl fmt::Display for Key {
1141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1142 if self.deploy_generation.is_some() {
1143 let s = serde_json::to_string(self).expect("JSON serializable");
1145 f.write_str(&s)
1146 } else {
1147 write!(f, "{}-{}", self.global_id, self.build_version)
1149 }
1150 }
1151 }
1152
1153 impl FromStr for Key {
1154 type Err = String;
1155
1156 fn from_str(s: &str) -> Result<Self, String> {
1157 if let Ok(key) = serde_json::from_str(s) {
1159 return Ok(key);
1160 };
1161
1162 let parts: Vec<_> = s.splitn(2, '-').collect();
1164 let &[global_id, build_version] = parts.as_slice() else {
1165 return Err(format!("invalid Key '{s}'"));
1166 };
1167 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1168 let build_version = build_version
1169 .parse::<Version>()
1170 .map_err(|e| e.to_string())?;
1171 Ok(Key {
1172 global_id,
1173 build_version,
1174 deploy_generation: None,
1175 })
1176 }
1177 }
1178
1179 impl Default for Key {
1180 fn default() -> Self {
1181 Self {
1182 global_id: Default::default(),
1183 build_version: Version::new(0, 0, 0),
1184 deploy_generation: Some(0),
1185 }
1186 }
1187 }
1188
1189 impl Codec for Key {
1190 type Schema = KeySchema;
1191 type Storage = ();
1192
1193 fn codec_name() -> String {
1194 "TableKey".into()
1195 }
1196
1197 fn encode<B: BufMut>(&self, buf: &mut B) {
1198 buf.put(self.to_string().as_bytes())
1199 }
1200
1201 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1202 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1203 s.parse()
1204 }
1205
1206 fn encode_schema(_schema: &KeySchema) -> Bytes {
1207 Bytes::new()
1208 }
1209
1210 fn decode_schema(buf: &Bytes) -> Self::Schema {
1211 assert_eq!(*buf, Bytes::new());
1212 KeySchema
1213 }
1214 }
1215
1216 impl SimpleColumnarData for Key {
1217 type ArrowBuilder = StringBuilder;
1218 type ArrowColumn = StringArray;
1219
1220 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1221 builder.values_slice().len()
1222 }
1223
1224 fn push(&self, builder: &mut Self::ArrowBuilder) {
1225 builder.append_value(&self.to_string());
1226 }
1227
1228 fn push_null(builder: &mut Self::ArrowBuilder) {
1229 builder.append_null();
1230 }
1231
1232 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1233 *self = column.value(idx).parse().expect("valid Key");
1234 }
1235 }
1236
1237 #[derive(Debug, PartialEq)]
1238 pub(super) struct KeySchema;
1239
1240 impl Schema<Key> for KeySchema {
1241 type ArrowColumn = StringArray;
1242 type Statistics = NoneStats;
1243 type Decoder = SimpleColumnarDecoder<Key>;
1244 type Encoder = SimpleColumnarEncoder<Key>;
1245
1246 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1247 Ok(SimpleColumnarEncoder::default())
1248 }
1249
1250 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1251 Ok(SimpleColumnarDecoder::new(col))
1252 }
1253 }
1254}
1255
1256#[cfg(test)]
1257#[path = "builtin_schema_migration_tests.rs"]
1258mod tests;