1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::Arc;
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40 MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::{Opaque, SinceHandle};
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68const MIGRATIONS: &[MigrationStep] = &[
76 MigrationStep {
77 version: Version::new(0, 149, 0),
78 object: Object {
79 type_: CatalogItemType::Source,
80 schema: MZ_INTERNAL_SCHEMA,
81 name: "mz_sink_statistics_raw",
82 },
83 mechanism: Mechanism::Replacement,
84 },
85 MigrationStep {
86 version: Version::new(0, 149, 0),
87 object: Object {
88 type_: CatalogItemType::Source,
89 schema: MZ_INTERNAL_SCHEMA,
90 name: "mz_source_statistics_raw",
91 },
92 mechanism: Mechanism::Replacement,
93 },
94 MigrationStep {
95 version: Version::new(0, 159, 0),
96 object: Object {
97 type_: CatalogItemType::Source,
98 schema: MZ_INTERNAL_SCHEMA,
99 name: "mz_cluster_replica_metrics_history",
100 },
101 mechanism: Mechanism::Evolution,
102 },
103 MigrationStep {
104 version: Version::new(0, 160, 0),
105 object: Object {
106 type_: CatalogItemType::Table,
107 schema: MZ_CATALOG_SCHEMA,
108 name: "mz_roles",
109 },
110 mechanism: Mechanism::Replacement,
111 },
112 MigrationStep {
113 version: Version::new(0, 160, 0),
114 object: Object {
115 type_: CatalogItemType::Table,
116 schema: MZ_CATALOG_SCHEMA,
117 name: "mz_sinks",
118 },
119 mechanism: Mechanism::Replacement,
120 },
121];
122
123#[derive(Clone, Debug)]
125struct MigrationStep {
126 version: Version,
128 object: Object,
130 mechanism: Mechanism,
132}
133
134#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
136#[allow(dead_code)]
137enum Mechanism {
138 Evolution,
143 Replacement,
147}
148
149#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct Object {
155 type_: CatalogItemType,
156 schema: &'static str,
157 name: &'static str,
158}
159
160impl From<Object> for SystemObjectDescription {
161 fn from(object: Object) -> Self {
162 SystemObjectDescription {
163 schema_name: object.schema.into(),
164 object_type: object.type_,
165 object_name: object.name.into(),
166 }
167 }
168}
169
170pub(super) struct MigrationResult {
172 pub replaced_items: BTreeSet<CatalogItemId>,
174 pub cleanup_action: BoxFuture<'static, ()>,
176}
177
178impl Default for MigrationResult {
179 fn default() -> Self {
180 Self {
181 replaced_items: Default::default(),
182 cleanup_action: async {}.boxed(),
183 }
184 }
185}
186
187pub(super) async fn run(
193 build_info: &BuildInfo,
194 deploy_generation: u64,
195 txn: &mut Transaction<'_>,
196 config: BuiltinItemMigrationConfig,
197) -> Result<MigrationResult, Error> {
198 assert_eq!(config.read_only, txn.is_savepoint());
200
201 if *build_info == DUMMY_BUILD_INFO {
204 return Ok(MigrationResult::default());
205 }
206
207 let Some(durable_version) = get_migration_version(txn) else {
208 return Ok(MigrationResult::default());
210 };
211 let build_version = build_info.semver_version();
212
213 let collection_metadata = txn.get_collection_metadata();
214 let system_objects = txn
215 .get_system_object_mappings()
216 .map(|m| {
217 let object = m.description;
218 let global_id = m.unique_identifier.global_id;
219 let shard_id = collection_metadata.get(&global_id).copied();
220 let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
221 panic!("missing builtin {object:?}");
222 };
223 let info = ObjectInfo {
224 global_id,
225 shard_id,
226 builtin,
227 fingerprint: m.unique_identifier.fingerprint,
228 };
229 (object, info)
230 })
231 .collect();
232
233 let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
234
235 let migration = Migration {
236 source_version: durable_version.clone(),
237 target_version: build_version.clone(),
238 deploy_generation,
239 system_objects,
240 migration_shard,
241 config,
242 };
243
244 let result = migration.run(MIGRATIONS).await.map_err(|e| {
245 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
246 last_seen_version: durable_version.to_string(),
247 this_version: build_version.to_string(),
248 cause: e.to_string(),
249 })
250 })?;
251
252 result.apply(txn);
253
254 let replaced_items = txn
255 .get_system_object_mappings()
256 .map(|m| m.unique_identifier)
257 .filter(|ids| result.new_shards.contains_key(&ids.global_id))
258 .map(|ids| ids.catalog_id)
259 .collect();
260
261 Ok(MigrationResult {
262 replaced_items,
263 cleanup_action: result.cleanup_action,
264 })
265}
266
267struct MigrationRunResult {
269 new_shards: BTreeMap<GlobalId, ShardId>,
270 new_fingerprints: BTreeMap<SystemObjectDescription, String>,
271 shards_to_finalize: BTreeSet<ShardId>,
272 cleanup_action: BoxFuture<'static, ()>,
273}
274
275impl Default for MigrationRunResult {
276 fn default() -> Self {
277 Self {
278 new_shards: BTreeMap::new(),
279 new_fingerprints: BTreeMap::new(),
280 shards_to_finalize: BTreeSet::new(),
281 cleanup_action: async {}.boxed(),
282 }
283 }
284}
285
286impl MigrationRunResult {
287 fn apply(&self, txn: &mut Transaction<'_>) {
289 let replaced_ids = self.new_shards.keys().copied().collect();
291 let old_metadata = txn.delete_collection_metadata(replaced_ids);
292 txn.insert_collection_metadata(self.new_shards.clone())
293 .expect("inserting unique shards IDs after deleting existing entries");
294
295 let mut unfinalized_shards: BTreeSet<_> =
297 old_metadata.into_iter().map(|(_, sid)| sid).collect();
298 unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
299 txn.insert_unfinalized_shards(unfinalized_shards)
300 .expect("cannot fail");
301
302 let mappings = txn
304 .get_system_object_mappings()
305 .filter_map(|m| {
306 let fingerprint = self.new_fingerprints.get(&m.description)?;
307 Some(SystemObjectMapping {
308 description: m.description,
309 unique_identifier: SystemObjectUniqueIdentifier {
310 catalog_id: m.unique_identifier.catalog_id,
311 global_id: m.unique_identifier.global_id,
312 fingerprint: fingerprint.clone(),
313 },
314 })
315 })
316 .collect();
317 txn.set_system_object_mappings(mappings)
318 .expect("filtered existing mappings remain unique");
319 }
320}
321
322#[derive(Clone, Debug)]
324struct ObjectInfo {
325 global_id: GlobalId,
326 shard_id: Option<ShardId>,
327 builtin: &'static Builtin<NameReference>,
328 fingerprint: String,
329}
330
331struct Migration {
333 source_version: Version,
338 target_version: Version,
342 deploy_generation: u64,
344 system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
346 migration_shard: ShardId,
348 config: BuiltinItemMigrationConfig,
350}
351
352impl Migration {
353 async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
354 info!(
355 deploy_generation = %self.deploy_generation,
356 "running builtin schema migration: {} -> {}",
357 self.source_version, self.target_version
358 );
359
360 self.validate_migration_steps(steps);
361
362 let (force, plan) = match self.config.force_migration.as_deref() {
363 None => (false, self.plan_migration(steps)),
364 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
365 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
366 Some(other) => panic!("unknown force migration mechanism: {other}"),
367 };
368
369 if self.source_version == self.target_version && !force {
370 info!("skipping migration: already at target version");
371 return Ok(MigrationRunResult::default());
372 } else if self.source_version > self.target_version {
373 bail!("downgrade not supported");
374 }
375
376 if !self.config.read_only {
379 self.upgrade_migration_shard_version().await;
380 }
381
382 info!("executing migration plan: {plan:?}");
383
384 self.migrate_evolve(&plan.evolve).await?;
385 let new_shards = self.migrate_replace(&plan.replace).await?;
386
387 let mut migrated_objects = BTreeSet::new();
388 migrated_objects.extend(plan.evolve);
389 migrated_objects.extend(plan.replace);
390
391 let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
392
393 let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
394
395 Ok(MigrationRunResult {
396 new_shards,
397 new_fingerprints,
398 shards_to_finalize,
399 cleanup_action,
400 })
401 }
402
403 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
407 for step in steps {
408 assert!(
409 step.version <= self.target_version,
410 "migration step version greater than target version: {} > {}",
411 step.version,
412 self.target_version,
413 );
414
415 let object = SystemObjectDescription::from(step.object.clone());
416
417 assert_ne!(
425 *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
426 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
427 );
428
429 assert_ne!(
432 *MZ_CATALOG_RAW_DESCRIPTION, object,
433 "mz_catalog_raw cannot be migrated"
434 );
435
436 let Some(object_info) = self.system_objects.get(&object) else {
437 panic!("migration step for non-existent builtin: {object:?}");
438 };
439
440 let builtin = object_info.builtin;
441 use Builtin::*;
442 assert!(
443 matches!(builtin, Table(..) | Source(..) | ContinualTask(..)),
444 "schema migration not supported for builtin: {builtin:?}",
445 );
446 }
447 }
448
449 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
451 let steps = steps.iter().filter(|s| s.version > self.source_version);
453
454 let mut by_object = BTreeMap::new();
458 for step in steps {
459 if let Some(entry) = by_object.get_mut(&step.object) {
460 *entry = match (step.mechanism, *entry) {
461 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
462 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
463 Mechanism::Replacement
464 }
465 };
466 } else {
467 by_object.insert(step.object.clone(), step.mechanism);
468 }
469 }
470
471 let mut plan = Plan::default();
472 for (object, mechanism) in by_object {
473 match mechanism {
474 Mechanism::Evolution => plan.evolve.push(object.into()),
475 Mechanism::Replacement => plan.replace.push(object.into()),
476 }
477 }
478
479 plan
480 }
481
482 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
484 let objects = self
485 .system_objects
486 .iter()
487 .filter(|(_, info)| {
488 use Builtin::*;
489 match info.builtin {
490 Table(..) | ContinualTask(..) => true,
491 Source(source) => **source != *MZ_CATALOG_RAW,
492 Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
493 }
494 })
495 .map(|(object, _)| object.clone())
496 .collect();
497
498 let mut plan = Plan::default();
499 match mechanism {
500 Mechanism::Evolution => plan.evolve = objects,
501 Mechanism::Replacement => plan.replace = objects,
502 }
503
504 plan
505 }
506
507 async fn upgrade_migration_shard_version(&self) {
509 let persist = &self.config.persist_client;
510 let diagnostics = Diagnostics {
511 shard_name: "builtin_migration".to_string(),
512 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
513 };
514
515 persist
516 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
517 self.migration_shard,
518 diagnostics,
519 )
520 .await
521 .expect("valid usage");
522 }
523
524 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
526 for object in objects {
527 self.migrate_evolve_one(object).await?;
528 }
529 Ok(())
530 }
531
532 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
533 let persist = &self.config.persist_client;
534
535 let Some(object_info) = self.system_objects.get(object) else {
536 bail!("missing builtin {object:?}");
537 };
538 let id = object_info.global_id;
539
540 let Some(shard_id) = object_info.shard_id else {
541 if self.config.read_only {
546 bail!("missing shard ID for builtin {object:?} ({id})");
547 } else {
548 return Ok(());
549 }
550 };
551
552 let target_desc = match object_info.builtin {
553 Builtin::Table(table) => &table.desc,
554 Builtin::Source(source) => &source.desc,
555 Builtin::ContinualTask(ct) => &ct.desc,
556 _ => bail!("not a storage collection: {object:?}"),
557 };
558
559 let diagnostics = Diagnostics {
560 shard_name: id.to_string(),
561 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
562 };
563 let source_schema = persist
564 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
565 .await
566 .expect("valid usage");
567
568 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
569
570 if self.config.read_only {
571 if let Some((_, source_desc, _)) = &source_schema {
574 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
575 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
576 if backward_compatible(&old, &new).is_none() {
577 bail!(
578 "incompatible schema evolution for {object:?}: \
579 {source_desc:?} -> {target_desc:?}"
580 );
581 }
582 }
583
584 return Ok(());
585 }
586
587 let (mut schema_id, mut source_desc) = match source_schema {
588 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
589 None => {
590 debug!(%id, %shard_id, "no previous schema found; registering initial one");
595 let schema_id = persist
596 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
597 shard_id,
598 target_desc,
599 &UnitSchema,
600 diagnostics.clone(),
601 )
602 .await
603 .expect("valid usage");
604 if schema_id.is_some() {
605 return Ok(());
606 }
607
608 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
609 let (schema_id, source_desc, _) = persist
610 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
611 shard_id,
612 diagnostics.clone(),
613 )
614 .await
615 .expect("valid usage")
616 .expect("known to exist");
617
618 (schema_id, source_desc)
619 }
620 };
621
622 loop {
623 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
628 let result = persist
629 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
630 shard_id,
631 schema_id,
632 target_desc,
633 &UnitSchema,
634 diagnostics.clone(),
635 )
636 .await
637 .expect("valid usage");
638
639 match result {
640 CaESchema::Ok(schema_id) => {
641 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
642 break;
643 }
644 CaESchema::Incompatible => bail!(
645 "incompatible schema evolution for {object:?}: \
646 {source_desc:?} -> {target_desc:?}"
647 ),
648 CaESchema::ExpectedMismatch {
649 schema_id: new_id,
650 key,
651 val: UnitSchema,
652 } => {
653 schema_id = new_id;
654 source_desc = key;
655 }
656 }
657 }
658
659 Ok(())
660 }
661
662 async fn migrate_replace(
664 &self,
665 objects: &[SystemObjectDescription],
666 ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
667 if objects.is_empty() {
668 return Ok(Default::default());
669 }
670
671 let diagnostics = Diagnostics {
672 shard_name: "builtin_migration".to_string(),
673 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
674 };
675 let (mut persist_write, mut persist_read) =
676 self.open_migration_shard(diagnostics.clone()).await;
677
678 let mut ids_to_replace = BTreeSet::new();
679 for object in objects {
680 if let Some(info) = self.system_objects.get(object) {
681 ids_to_replace.insert(info.global_id);
682 } else {
683 bail!("missing id for builtin {object:?}");
684 }
685 }
686
687 info!(?objects, ?ids_to_replace, "migrating by replacement");
688
689 let replaced_shards = loop {
692 if let Some(shards) = self
693 .try_get_or_insert_replacement_shards(
694 &ids_to_replace,
695 &mut persist_write,
696 &mut persist_read,
697 )
698 .await?
699 {
700 break shards;
701 }
702 };
703
704 Ok(replaced_shards)
705 }
706
707 async fn try_get_or_insert_replacement_shards(
717 &self,
718 ids_to_replace: &BTreeSet<GlobalId>,
719 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
720 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
721 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
722 let upper = persist_write.fetch_recent_upper().await;
723 let write_ts = *upper.as_option().expect("migration shard not sealed");
724
725 let mut ids_to_replace = ids_to_replace.clone();
726 let mut replaced_shards = BTreeMap::new();
727
728 if let Some(read_ts) = write_ts.step_back() {
736 let pred = |key: &migration_shard::Key| {
737 key.build_version == self.target_version
738 && key.deploy_generation == Some(self.deploy_generation)
739 };
740 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
741 for (key, shard_id) in entries {
742 let id = GlobalId::System(key.global_id);
743 if ids_to_replace.remove(&id) {
744 replaced_shards.insert(id, shard_id);
745 }
746 }
747
748 debug!(
749 %read_ts, ?replaced_shards, ?ids_to_replace,
750 "found existing entries in migration shard",
751 );
752 }
753
754 if ids_to_replace.is_empty() {
755 return Ok(Some(replaced_shards));
756 }
757 }
758
759 let mut updates = Vec::new();
763 for id in ids_to_replace {
764 let shard_id = ShardId::new();
765 replaced_shards.insert(id, shard_id);
766
767 let GlobalId::System(global_id) = id else {
768 bail!("attempt to migrate a non-system collection: {id}");
769 };
770 let key = migration_shard::Key {
771 global_id,
772 build_version: self.target_version.clone(),
773 deploy_generation: Some(self.deploy_generation),
774 };
775 updates.push(((key, shard_id), write_ts, 1));
776 }
777
778 let upper = Antichain::from_elem(write_ts);
779 let new_upper = Antichain::from_elem(write_ts.step_forward());
780 debug!(%write_ts, "attempting insert into migration shard");
781 let result = persist_write
782 .compare_and_append(updates, upper, new_upper)
783 .await
784 .expect("valid usage");
785
786 match result {
787 Ok(()) => {
788 debug!(
789 %write_ts, ?replaced_shards,
790 "successfully inserted into migration shard"
791 );
792 Ok(Some(replaced_shards))
793 }
794 Err(_mismatch) => Ok(None),
795 }
796 }
797
798 async fn open_migration_shard(
800 &self,
801 diagnostics: Diagnostics,
802 ) -> (
803 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
804 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
805 ) {
806 let persist = &self.config.persist_client;
807
808 persist
809 .open(
810 self.migration_shard,
811 Arc::new(migration_shard::KeySchema),
812 Arc::new(ShardIdSchema),
813 diagnostics,
814 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
815 )
816 .await
817 .expect("valid usage")
818 }
819
820 async fn open_migration_shard_since(
822 &self,
823 diagnostics: Diagnostics,
824 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
825 self.config
826 .persist_client
827 .open_critical_since(
828 self.migration_shard,
829 PersistClient::CONTROLLER_CRITICAL_SINCE,
832 Opaque::encode(&i64::MIN),
833 diagnostics.clone(),
834 )
835 .await
836 .expect("valid usage")
837 }
838
839 fn update_fingerprints(
844 &self,
845 migrated_items: &BTreeSet<SystemObjectDescription>,
846 ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
847 let mut new_fingerprints = BTreeMap::new();
848 for (object, object_info) in &self.system_objects {
849 let id = object_info.global_id;
850 let builtin = object_info.builtin;
851
852 let fingerprint = builtin.fingerprint();
853 if fingerprint == object_info.fingerprint {
854 continue; }
856
857 let migrated = migrated_items.contains(object);
859 let ephemeral = matches!(
861 builtin,
862 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
863 );
864
865 if migrated || ephemeral {
866 new_fingerprints.insert(object.clone(), fingerprint);
867 } else if builtin.runtime_alterable() {
868 assert_eq!(
871 object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
872 "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
873 );
874 } else {
875 panic!(
876 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
877 fingerprint, object_info.fingerprint,
878 );
879 }
880 }
881
882 Ok(new_fingerprints)
883 }
884
885 async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
902 let noop_action = async {}.boxed();
903 let noop_result = (BTreeSet::new(), noop_action);
904
905 if self.config.read_only {
906 return Ok(noop_result);
907 }
908
909 let diagnostics = Diagnostics {
910 shard_name: "builtin_migration".to_string(),
911 handle_purpose: "builtin schema migration cleanup".into(),
912 };
913 let (mut persist_write, mut persist_read) =
914 self.open_migration_shard(diagnostics.clone()).await;
915 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
916
917 let upper = persist_write.fetch_recent_upper().await.clone();
918 let write_ts = *upper.as_option().expect("migration shard not sealed");
919 let Some(read_ts) = write_ts.step_back() else {
920 return Ok(noop_result);
921 };
922
923 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
925 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
926 else {
927 return Ok(noop_result);
928 };
929
930 debug!(
931 ?stale_entries,
932 "cleaning migration shard up to version {}", self.target_version,
933 );
934
935 let current_shards: BTreeMap<_, _> = self
936 .system_objects
937 .values()
938 .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
939 .collect();
940
941 let mut shards_to_finalize = BTreeSet::new();
942 let mut retractions = Vec::new();
943 for (key, shard_id) in stale_entries {
944 let gid = GlobalId::System(key.global_id);
948 if current_shards.get(&gid) != Some(&shard_id) {
949 shards_to_finalize.insert(shard_id);
950 }
951
952 retractions.push(((key, shard_id), write_ts, -1));
953 }
954
955 let cleanup_action = async move {
956 if !retractions.is_empty() {
957 let new_upper = Antichain::from_elem(write_ts.step_forward());
958 let result = persist_write
959 .compare_and_append(retractions, upper, new_upper)
960 .await
961 .expect("valid usage");
962 match result {
963 Ok(()) => debug!("cleaned up migration shard"),
964 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
965 }
966 }
967 }
968 .boxed();
969
970 let o = persist_since.opaque().clone();
972 let new_since = Antichain::from_elem(read_ts);
973 let result = persist_since
974 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
975 .await;
976 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
977
978 Ok((shards_to_finalize, cleanup_action))
979 }
980}
981
982async fn read_migration_shard<P>(
988 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
989 read_ts: Timestamp,
990 predicate: P,
991) -> Option<Vec<(migration_shard::Key, ShardId)>>
992where
993 P: for<'a> Fn(&migration_shard::Key) -> bool,
994{
995 let as_of = Antichain::from_elem(read_ts);
996 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
997
998 assert!(
999 updates.iter().all(|(_, _, diff)| *diff == 1),
1000 "migration shard contains invalid diffs: {updates:?}",
1001 );
1002
1003 let entries: Vec<_> = updates
1004 .into_iter()
1005 .map(|(data, _, _)| data)
1006 .filter(move |(key, _)| predicate(key))
1007 .collect();
1008
1009 (!entries.is_empty()).then_some(entries)
1010}
1011
1012#[derive(Debug, Default)]
1014struct Plan {
1015 evolve: Vec<SystemObjectDescription>,
1017 replace: Vec<SystemObjectDescription>,
1019}
1020
1021mod migration_shard {
1023 use std::fmt;
1024 use std::str::FromStr;
1025
1026 use arrow::array::{StringArray, StringBuilder};
1027 use bytes::{BufMut, Bytes};
1028 use mz_persist_types::Codec;
1029 use mz_persist_types::codec_impls::{
1030 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1031 };
1032 use mz_persist_types::columnar::Schema;
1033 use mz_persist_types::stats::NoneStats;
1034 use semver::Version;
1035 use serde::{Deserialize, Serialize};
1036
1037 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1038 pub(super) struct Key {
1039 pub(super) global_id: u64,
1040 pub(super) build_version: Version,
1041 pub(super) deploy_generation: Option<u64>,
1045 }
1046
1047 impl fmt::Display for Key {
1048 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1049 if self.deploy_generation.is_some() {
1050 let s = serde_json::to_string(self).expect("JSON serializable");
1052 f.write_str(&s)
1053 } else {
1054 write!(f, "{}-{}", self.global_id, self.build_version)
1056 }
1057 }
1058 }
1059
1060 impl FromStr for Key {
1061 type Err = String;
1062
1063 fn from_str(s: &str) -> Result<Self, String> {
1064 if let Ok(key) = serde_json::from_str(s) {
1066 return Ok(key);
1067 };
1068
1069 let parts: Vec<_> = s.splitn(2, '-').collect();
1071 let &[global_id, build_version] = parts.as_slice() else {
1072 return Err(format!("invalid Key '{s}'"));
1073 };
1074 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1075 let build_version = build_version
1076 .parse::<Version>()
1077 .map_err(|e| e.to_string())?;
1078 Ok(Key {
1079 global_id,
1080 build_version,
1081 deploy_generation: None,
1082 })
1083 }
1084 }
1085
1086 impl Default for Key {
1087 fn default() -> Self {
1088 Self {
1089 global_id: Default::default(),
1090 build_version: Version::new(0, 0, 0),
1091 deploy_generation: Some(0),
1092 }
1093 }
1094 }
1095
1096 impl Codec for Key {
1097 type Schema = KeySchema;
1098 type Storage = ();
1099
1100 fn codec_name() -> String {
1101 "TableKey".into()
1102 }
1103
1104 fn encode<B: BufMut>(&self, buf: &mut B) {
1105 buf.put(self.to_string().as_bytes())
1106 }
1107
1108 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1109 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1110 s.parse()
1111 }
1112
1113 fn encode_schema(_schema: &KeySchema) -> Bytes {
1114 Bytes::new()
1115 }
1116
1117 fn decode_schema(buf: &Bytes) -> Self::Schema {
1118 assert_eq!(*buf, Bytes::new());
1119 KeySchema
1120 }
1121 }
1122
1123 impl SimpleColumnarData for Key {
1124 type ArrowBuilder = StringBuilder;
1125 type ArrowColumn = StringArray;
1126
1127 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1128 builder.values_slice().len()
1129 }
1130
1131 fn push(&self, builder: &mut Self::ArrowBuilder) {
1132 builder.append_value(&self.to_string());
1133 }
1134
1135 fn push_null(builder: &mut Self::ArrowBuilder) {
1136 builder.append_null();
1137 }
1138
1139 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1140 *self = column.value(idx).parse().expect("valid Key");
1141 }
1142 }
1143
1144 #[derive(Debug, PartialEq)]
1145 pub(super) struct KeySchema;
1146
1147 impl Schema<Key> for KeySchema {
1148 type ArrowColumn = StringArray;
1149 type Statistics = NoneStats;
1150 type Decoder = SimpleColumnarDecoder<Key>;
1151 type Encoder = SimpleColumnarEncoder<Key>;
1152
1153 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1154 Ok(SimpleColumnarEncoder::default())
1155 }
1156
1157 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1158 Ok(SimpleColumnarDecoder::new(col))
1159 }
1160 }
1161}
1162
1163#[cfg(test)]
1164#[path = "builtin_schema_migration_tests.rs"]
1165mod tests;