1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::Arc;
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
40 RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::SinceHandle;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68const MIGRATIONS: &[MigrationStep] = &[
76 MigrationStep {
77 version: Version::new(0, 149, 0),
78 object: Object {
79 type_: CatalogItemType::Source,
80 schema: MZ_INTERNAL_SCHEMA,
81 name: "mz_sink_statistics_raw",
82 },
83 mechanism: Mechanism::Replacement,
84 },
85 MigrationStep {
86 version: Version::new(0, 149, 0),
87 object: Object {
88 type_: CatalogItemType::Source,
89 schema: MZ_INTERNAL_SCHEMA,
90 name: "mz_source_statistics_raw",
91 },
92 mechanism: Mechanism::Replacement,
93 },
94 MigrationStep {
95 version: Version::new(0, 159, 0),
96 object: Object {
97 type_: CatalogItemType::Source,
98 schema: MZ_INTERNAL_SCHEMA,
99 name: "mz_cluster_replica_metrics_history",
100 },
101 mechanism: Mechanism::Evolution,
102 },
103 MigrationStep {
104 version: Version::new(0, 160, 0),
105 object: Object {
106 type_: CatalogItemType::Table,
107 schema: MZ_CATALOG_SCHEMA,
108 name: "mz_roles",
109 },
110 mechanism: Mechanism::Replacement,
111 },
112 MigrationStep {
113 version: Version::new(0, 160, 0),
114 object: Object {
115 type_: CatalogItemType::Table,
116 schema: MZ_CATALOG_SCHEMA,
117 name: "mz_sinks",
118 },
119 mechanism: Mechanism::Replacement,
120 },
121];
122
123#[derive(Clone, Debug)]
125struct MigrationStep {
126 version: Version,
128 object: Object,
130 mechanism: Mechanism,
132}
133
134#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
136#[allow(dead_code)]
137enum Mechanism {
138 Evolution,
143 Replacement,
147}
148
149#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct Object {
155 type_: CatalogItemType,
156 schema: &'static str,
157 name: &'static str,
158}
159
160impl From<Object> for SystemObjectDescription {
161 fn from(object: Object) -> Self {
162 SystemObjectDescription {
163 schema_name: object.schema.into(),
164 object_type: object.type_,
165 object_name: object.name.into(),
166 }
167 }
168}
169
170pub(super) struct MigrationResult {
172 pub replaced_items: BTreeSet<CatalogItemId>,
174 pub cleanup_action: BoxFuture<'static, ()>,
176}
177
178impl Default for MigrationResult {
179 fn default() -> Self {
180 Self {
181 replaced_items: Default::default(),
182 cleanup_action: async {}.boxed(),
183 }
184 }
185}
186
187pub(super) async fn run(
193 build_info: &BuildInfo,
194 deploy_generation: u64,
195 txn: &mut Transaction<'_>,
196 config: BuiltinItemMigrationConfig,
197) -> Result<MigrationResult, Error> {
198 assert_eq!(config.read_only, txn.is_savepoint());
200
201 if *build_info == DUMMY_BUILD_INFO {
204 return Ok(MigrationResult::default());
205 }
206
207 let Some(durable_version) = get_migration_version(txn) else {
208 return Ok(MigrationResult::default());
210 };
211 let build_version = build_info.semver_version();
212
213 let collection_metadata = txn.get_collection_metadata();
214 let system_objects = txn
215 .get_system_object_mappings()
216 .map(|m| {
217 let object = m.description;
218 let global_id = m.unique_identifier.global_id;
219 let shard_id = collection_metadata.get(&global_id).copied();
220 let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
221 panic!("missing builtin {object:?}");
222 };
223 let info = ObjectInfo {
224 global_id,
225 shard_id,
226 builtin,
227 fingerprint: m.unique_identifier.fingerprint,
228 };
229 (object, info)
230 })
231 .collect();
232
233 let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
234
235 let migration = Migration {
236 source_version: durable_version.clone(),
237 target_version: build_version.clone(),
238 deploy_generation,
239 system_objects,
240 migration_shard,
241 config,
242 };
243
244 let result = migration.run(MIGRATIONS).await.map_err(|e| {
245 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
246 last_seen_version: durable_version.to_string(),
247 this_version: build_version.to_string(),
248 cause: e.to_string(),
249 })
250 })?;
251
252 result.apply(txn);
253
254 let replaced_items = txn
255 .get_system_object_mappings()
256 .map(|m| m.unique_identifier)
257 .filter(|ids| result.new_shards.contains_key(&ids.global_id))
258 .map(|ids| ids.catalog_id)
259 .collect();
260
261 Ok(MigrationResult {
262 replaced_items,
263 cleanup_action: result.cleanup_action,
264 })
265}
266
267struct MigrationRunResult {
269 new_shards: BTreeMap<GlobalId, ShardId>,
270 new_fingerprints: BTreeMap<SystemObjectDescription, String>,
271 shards_to_finalize: BTreeSet<ShardId>,
272 cleanup_action: BoxFuture<'static, ()>,
273}
274
275impl Default for MigrationRunResult {
276 fn default() -> Self {
277 Self {
278 new_shards: BTreeMap::new(),
279 new_fingerprints: BTreeMap::new(),
280 shards_to_finalize: BTreeSet::new(),
281 cleanup_action: async {}.boxed(),
282 }
283 }
284}
285
286impl MigrationRunResult {
287 fn apply(&self, txn: &mut Transaction<'_>) {
289 let replaced_ids = self.new_shards.keys().copied().collect();
291 let old_metadata = txn.delete_collection_metadata(replaced_ids);
292 txn.insert_collection_metadata(self.new_shards.clone())
293 .expect("inserting unique shards IDs after deleting existing entries");
294
295 let mut unfinalized_shards: BTreeSet<_> =
297 old_metadata.into_iter().map(|(_, sid)| sid).collect();
298 unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
299 txn.insert_unfinalized_shards(unfinalized_shards)
300 .expect("cannot fail");
301
302 let mappings = txn
304 .get_system_object_mappings()
305 .filter_map(|m| {
306 let fingerprint = self.new_fingerprints.get(&m.description)?;
307 Some(SystemObjectMapping {
308 description: m.description,
309 unique_identifier: SystemObjectUniqueIdentifier {
310 catalog_id: m.unique_identifier.catalog_id,
311 global_id: m.unique_identifier.global_id,
312 fingerprint: fingerprint.clone(),
313 },
314 })
315 })
316 .collect();
317 txn.set_system_object_mappings(mappings)
318 .expect("filtered existing mappings remain unique");
319 }
320}
321
322#[derive(Clone, Debug)]
324struct ObjectInfo {
325 global_id: GlobalId,
326 shard_id: Option<ShardId>,
327 builtin: &'static Builtin<NameReference>,
328 fingerprint: String,
329}
330
331struct Migration {
333 source_version: Version,
338 target_version: Version,
342 deploy_generation: u64,
344 system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
346 migration_shard: ShardId,
348 config: BuiltinItemMigrationConfig,
350}
351
352impl Migration {
353 async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
354 info!(
355 deploy_generation = %self.deploy_generation,
356 "running builtin schema migration: {} -> {}",
357 self.source_version, self.target_version
358 );
359
360 self.validate_migration_steps(steps);
361
362 let (force, plan) = match self.config.force_migration.as_deref() {
363 None => (false, self.plan_migration(steps)),
364 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
365 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
366 Some(other) => panic!("unknown force migration mechanism: {other}"),
367 };
368
369 if self.source_version == self.target_version && !force {
370 info!("skipping migration: already at target version");
371 return Ok(MigrationRunResult::default());
372 } else if self.source_version > self.target_version {
373 bail!("downgrade not supported");
374 }
375
376 if !self.config.read_only {
379 self.upgrade_migration_shard_version().await;
380 }
381
382 info!("executing migration plan: {plan:?}");
383
384 self.migrate_evolve(&plan.evolve).await?;
385 let new_shards = self.migrate_replace(&plan.replace).await?;
386
387 let mut migrated_objects = BTreeSet::new();
388 migrated_objects.extend(plan.evolve);
389 migrated_objects.extend(plan.replace);
390
391 let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
392
393 let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
394
395 Ok(MigrationRunResult {
396 new_shards,
397 new_fingerprints,
398 shards_to_finalize,
399 cleanup_action,
400 })
401 }
402
403 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
407 for step in steps {
408 assert!(
409 step.version <= self.target_version,
410 "migration step version greater than target version: {} > {}",
411 step.version,
412 self.target_version,
413 );
414
415 let object = SystemObjectDescription::from(step.object.clone());
416
417 assert_ne!(
425 *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
426 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
427 );
428
429 let Some(object_info) = self.system_objects.get(&object) else {
430 panic!("migration step for non-existent builtin: {object:?}");
431 };
432
433 let builtin = object_info.builtin;
434 use Builtin::*;
435 assert!(
436 matches!(builtin, Table(..) | Source(..) | ContinualTask(..)),
437 "schema migration not supported for builtin: {builtin:?}",
438 );
439 }
440 }
441
442 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
444 let steps = steps.iter().filter(|s| s.version > self.source_version);
446
447 let mut by_object = BTreeMap::new();
451 for step in steps {
452 if let Some(entry) = by_object.get_mut(&step.object) {
453 *entry = match (step.mechanism, *entry) {
454 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
455 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
456 Mechanism::Replacement
457 }
458 };
459 } else {
460 by_object.insert(step.object.clone(), step.mechanism);
461 }
462 }
463
464 let mut plan = Plan::default();
465 for (object, mechanism) in by_object {
466 match mechanism {
467 Mechanism::Evolution => plan.evolve.push(object.into()),
468 Mechanism::Replacement => plan.replace.push(object.into()),
469 }
470 }
471
472 plan
473 }
474
475 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
477 let objects = self
478 .system_objects
479 .iter()
480 .filter(|(_, info)| matches!(info.builtin, Builtin::Table(..) | Builtin::Source(..)))
481 .map(|(object, _)| object.clone())
482 .collect();
483
484 let mut plan = Plan::default();
485 match mechanism {
486 Mechanism::Evolution => plan.evolve = objects,
487 Mechanism::Replacement => plan.replace = objects,
488 }
489
490 plan
491 }
492
493 async fn upgrade_migration_shard_version(&self) {
495 let persist = &self.config.persist_client;
496 let diagnostics = Diagnostics {
497 shard_name: "builtin_migration".to_string(),
498 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
499 };
500
501 persist
502 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
503 self.migration_shard,
504 diagnostics,
505 )
506 .await
507 .expect("valid usage");
508 }
509
510 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
512 for object in objects {
513 self.migrate_evolve_one(object).await?;
514 }
515 Ok(())
516 }
517
518 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
519 let persist = &self.config.persist_client;
520
521 let Some(object_info) = self.system_objects.get(object) else {
522 bail!("missing builtin {object:?}");
523 };
524 let id = object_info.global_id;
525
526 let Some(shard_id) = object_info.shard_id else {
527 if self.config.read_only {
532 bail!("missing shard ID for builtin {object:?} ({id})");
533 } else {
534 return Ok(());
535 }
536 };
537
538 let target_desc = match object_info.builtin {
539 Builtin::Table(table) => &table.desc,
540 Builtin::Source(source) => &source.desc,
541 Builtin::ContinualTask(ct) => &ct.desc,
542 _ => bail!("not a storage collection: {object:?}"),
543 };
544
545 let diagnostics = Diagnostics {
546 shard_name: id.to_string(),
547 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
548 };
549 let source_schema = persist
550 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
551 .await
552 .expect("valid usage");
553
554 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
555
556 if self.config.read_only {
557 if let Some((_, source_desc, _)) = &source_schema {
560 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
561 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
562 if backward_compatible(&old, &new).is_none() {
563 bail!(
564 "incompatible schema evolution for {object:?}: \
565 {source_desc:?} -> {target_desc:?}"
566 );
567 }
568 }
569
570 return Ok(());
571 }
572
573 let (mut schema_id, mut source_desc) = match source_schema {
574 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
575 None => {
576 debug!(%id, %shard_id, "no previous schema found; registering initial one");
581 let schema_id = persist
582 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
583 shard_id,
584 target_desc,
585 &UnitSchema,
586 diagnostics.clone(),
587 )
588 .await
589 .expect("valid usage");
590 if schema_id.is_some() {
591 return Ok(());
592 }
593
594 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
595 let (schema_id, source_desc, _) = persist
596 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
597 shard_id,
598 diagnostics.clone(),
599 )
600 .await
601 .expect("valid usage")
602 .expect("known to exist");
603
604 (schema_id, source_desc)
605 }
606 };
607
608 loop {
609 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
614 let result = persist
615 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
616 shard_id,
617 schema_id,
618 target_desc,
619 &UnitSchema,
620 diagnostics.clone(),
621 )
622 .await
623 .expect("valid usage");
624
625 match result {
626 CaESchema::Ok(schema_id) => {
627 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
628 break;
629 }
630 CaESchema::Incompatible => bail!(
631 "incompatible schema evolution for {object:?}: \
632 {source_desc:?} -> {target_desc:?}"
633 ),
634 CaESchema::ExpectedMismatch {
635 schema_id: new_id,
636 key,
637 val: UnitSchema,
638 } => {
639 schema_id = new_id;
640 source_desc = key;
641 }
642 }
643 }
644
645 Ok(())
646 }
647
648 async fn migrate_replace(
650 &self,
651 objects: &[SystemObjectDescription],
652 ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
653 if objects.is_empty() {
654 return Ok(Default::default());
655 }
656
657 let diagnostics = Diagnostics {
658 shard_name: "builtin_migration".to_string(),
659 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
660 };
661 let (mut persist_write, mut persist_read) =
662 self.open_migration_shard(diagnostics.clone()).await;
663
664 let mut ids_to_replace = BTreeSet::new();
665 for object in objects {
666 if let Some(info) = self.system_objects.get(object) {
667 ids_to_replace.insert(info.global_id);
668 } else {
669 bail!("missing id for builtin {object:?}");
670 }
671 }
672
673 info!(?objects, ?ids_to_replace, "migrating by replacement");
674
675 let replaced_shards = loop {
678 if let Some(shards) = self
679 .try_get_or_insert_replacement_shards(
680 &ids_to_replace,
681 &mut persist_write,
682 &mut persist_read,
683 )
684 .await?
685 {
686 break shards;
687 }
688 };
689
690 Ok(replaced_shards)
691 }
692
693 async fn try_get_or_insert_replacement_shards(
703 &self,
704 ids_to_replace: &BTreeSet<GlobalId>,
705 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
706 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
707 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
708 let upper = persist_write.fetch_recent_upper().await;
709 let write_ts = *upper.as_option().expect("migration shard not sealed");
710
711 let mut ids_to_replace = ids_to_replace.clone();
712 let mut replaced_shards = BTreeMap::new();
713
714 if let Some(read_ts) = write_ts.step_back() {
722 let pred = |key: &migration_shard::Key| {
723 key.build_version == self.target_version
724 && key.deploy_generation == Some(self.deploy_generation)
725 };
726 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
727 for (key, shard_id) in entries {
728 let id = GlobalId::System(key.global_id);
729 if ids_to_replace.remove(&id) {
730 replaced_shards.insert(id, shard_id);
731 }
732 }
733
734 debug!(
735 %read_ts, ?replaced_shards, ?ids_to_replace,
736 "found existing entries in migration shard",
737 );
738 }
739
740 if ids_to_replace.is_empty() {
741 return Ok(Some(replaced_shards));
742 }
743 }
744
745 let mut updates = Vec::new();
749 for id in ids_to_replace {
750 let shard_id = ShardId::new();
751 replaced_shards.insert(id, shard_id);
752
753 let GlobalId::System(global_id) = id else {
754 bail!("attempt to migrate a non-system collection: {id}");
755 };
756 let key = migration_shard::Key {
757 global_id,
758 build_version: self.target_version.clone(),
759 deploy_generation: Some(self.deploy_generation),
760 };
761 updates.push(((key, shard_id), write_ts, 1));
762 }
763
764 let upper = Antichain::from_elem(write_ts);
765 let new_upper = Antichain::from_elem(write_ts.step_forward());
766 debug!(%write_ts, "attempting insert into migration shard");
767 let result = persist_write
768 .compare_and_append(updates, upper, new_upper)
769 .await
770 .expect("valid usage");
771
772 match result {
773 Ok(()) => {
774 debug!(
775 %write_ts, ?replaced_shards,
776 "successfully inserted into migration shard"
777 );
778 Ok(Some(replaced_shards))
779 }
780 Err(_mismatch) => Ok(None),
781 }
782 }
783
784 async fn open_migration_shard(
786 &self,
787 diagnostics: Diagnostics,
788 ) -> (
789 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
790 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
791 ) {
792 let persist = &self.config.persist_client;
793
794 persist
795 .open(
796 self.migration_shard,
797 Arc::new(migration_shard::KeySchema),
798 Arc::new(ShardIdSchema),
799 diagnostics,
800 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
801 )
802 .await
803 .expect("valid usage")
804 }
805
806 async fn open_migration_shard_since(
808 &self,
809 diagnostics: Diagnostics,
810 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff, i64> {
811 self.config
812 .persist_client
813 .open_critical_since(
814 self.migration_shard,
815 PersistClient::CONTROLLER_CRITICAL_SINCE,
818 diagnostics.clone(),
819 )
820 .await
821 .expect("valid usage")
822 }
823
824 fn update_fingerprints(
829 &self,
830 migrated_items: &BTreeSet<SystemObjectDescription>,
831 ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
832 let mut new_fingerprints = BTreeMap::new();
833 for (object, object_info) in &self.system_objects {
834 let id = object_info.global_id;
835 let builtin = object_info.builtin;
836
837 let fingerprint = builtin.fingerprint();
838 if fingerprint == object_info.fingerprint {
839 continue; }
841
842 let migrated = migrated_items.contains(object);
844 let ephemeral = matches!(
846 builtin,
847 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
848 );
849
850 if migrated || ephemeral {
851 new_fingerprints.insert(object.clone(), fingerprint);
852 } else if builtin.runtime_alterable() {
853 assert_eq!(
856 object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
857 "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
858 );
859 } else {
860 panic!(
861 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
862 fingerprint, object_info.fingerprint,
863 );
864 }
865 }
866
867 Ok(new_fingerprints)
868 }
869
870 async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
887 let noop_action = async {}.boxed();
888 let noop_result = (BTreeSet::new(), noop_action);
889
890 if self.config.read_only {
891 return Ok(noop_result);
892 }
893
894 let diagnostics = Diagnostics {
895 shard_name: "builtin_migration".to_string(),
896 handle_purpose: "builtin schema migration cleanup".into(),
897 };
898 let (mut persist_write, mut persist_read) =
899 self.open_migration_shard(diagnostics.clone()).await;
900 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
901
902 let upper = persist_write.fetch_recent_upper().await.clone();
903 let write_ts = *upper.as_option().expect("migration shard not sealed");
904 let Some(read_ts) = write_ts.step_back() else {
905 return Ok(noop_result);
906 };
907
908 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
910 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
911 else {
912 return Ok(noop_result);
913 };
914
915 debug!(
916 ?stale_entries,
917 "cleaning migration shard up to version {}", self.target_version,
918 );
919
920 let current_shards: BTreeMap<_, _> = self
921 .system_objects
922 .values()
923 .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
924 .collect();
925
926 let mut shards_to_finalize = BTreeSet::new();
927 let mut retractions = Vec::new();
928 for (key, shard_id) in stale_entries {
929 let gid = GlobalId::System(key.global_id);
933 if current_shards.get(&gid) != Some(&shard_id) {
934 shards_to_finalize.insert(shard_id);
935 }
936
937 retractions.push(((key, shard_id), write_ts, -1));
938 }
939
940 let cleanup_action = async move {
941 if !retractions.is_empty() {
942 let new_upper = Antichain::from_elem(write_ts.step_forward());
943 let result = persist_write
944 .compare_and_append(retractions, upper, new_upper)
945 .await
946 .expect("valid usage");
947 match result {
948 Ok(()) => debug!("cleaned up migration shard"),
949 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
950 }
951 }
952 }
953 .boxed();
954
955 let o = *persist_since.opaque();
957 let new_since = Antichain::from_elem(read_ts);
958 let result = persist_since
959 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
960 .await;
961 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
962
963 Ok((shards_to_finalize, cleanup_action))
964 }
965}
966
967async fn read_migration_shard<P>(
973 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
974 read_ts: Timestamp,
975 predicate: P,
976) -> Option<Vec<(migration_shard::Key, ShardId)>>
977where
978 P: for<'a> Fn(&migration_shard::Key) -> bool,
979{
980 let as_of = Antichain::from_elem(read_ts);
981 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
982
983 assert!(
984 updates.iter().all(|(_, _, diff)| *diff == 1),
985 "migration shard contains invalid diffs: {updates:?}",
986 );
987
988 let entries: Vec<_> = updates
989 .into_iter()
990 .filter_map(|(data, _, _)| {
991 if let (Ok(key), Ok(val)) = data {
992 Some((key, val))
993 } else {
994 info!("skipping unreadable migration shard entry: {data:?}");
997 None
998 }
999 })
1000 .filter(move |(key, _)| predicate(key))
1001 .collect();
1002
1003 (!entries.is_empty()).then_some(entries)
1004}
1005
1006#[derive(Debug, Default)]
1008struct Plan {
1009 evolve: Vec<SystemObjectDescription>,
1011 replace: Vec<SystemObjectDescription>,
1013}
1014
1015mod migration_shard {
1017 use std::fmt;
1018 use std::str::FromStr;
1019
1020 use arrow::array::{StringArray, StringBuilder};
1021 use bytes::{BufMut, Bytes};
1022 use mz_persist_types::Codec;
1023 use mz_persist_types::codec_impls::{
1024 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1025 };
1026 use mz_persist_types::columnar::Schema;
1027 use mz_persist_types::stats::NoneStats;
1028 use semver::Version;
1029 use serde::{Deserialize, Serialize};
1030
1031 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1032 pub(super) struct Key {
1033 pub(super) global_id: u64,
1034 pub(super) build_version: Version,
1035 pub(super) deploy_generation: Option<u64>,
1039 }
1040
1041 impl fmt::Display for Key {
1042 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1043 if self.deploy_generation.is_some() {
1044 let s = serde_json::to_string(self).expect("JSON serializable");
1046 f.write_str(&s)
1047 } else {
1048 write!(f, "{}-{}", self.global_id, self.build_version)
1050 }
1051 }
1052 }
1053
1054 impl FromStr for Key {
1055 type Err = String;
1056
1057 fn from_str(s: &str) -> Result<Self, String> {
1058 if let Ok(key) = serde_json::from_str(s) {
1060 return Ok(key);
1061 };
1062
1063 let parts: Vec<_> = s.splitn(2, '-').collect();
1065 let &[global_id, build_version] = parts.as_slice() else {
1066 return Err(format!("invalid Key '{s}'"));
1067 };
1068 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1069 let build_version = build_version
1070 .parse::<Version>()
1071 .map_err(|e| e.to_string())?;
1072 Ok(Key {
1073 global_id,
1074 build_version,
1075 deploy_generation: None,
1076 })
1077 }
1078 }
1079
1080 impl Default for Key {
1081 fn default() -> Self {
1082 Self {
1083 global_id: Default::default(),
1084 build_version: Version::new(0, 0, 0),
1085 deploy_generation: Some(0),
1086 }
1087 }
1088 }
1089
1090 impl Codec for Key {
1091 type Schema = KeySchema;
1092 type Storage = ();
1093
1094 fn codec_name() -> String {
1095 "TableKey".into()
1096 }
1097
1098 fn encode<B: BufMut>(&self, buf: &mut B) {
1099 buf.put(self.to_string().as_bytes())
1100 }
1101
1102 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1103 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1104 s.parse()
1105 }
1106
1107 fn encode_schema(_schema: &KeySchema) -> Bytes {
1108 Bytes::new()
1109 }
1110
1111 fn decode_schema(buf: &Bytes) -> Self::Schema {
1112 assert_eq!(*buf, Bytes::new());
1113 KeySchema
1114 }
1115 }
1116
1117 impl SimpleColumnarData for Key {
1118 type ArrowBuilder = StringBuilder;
1119 type ArrowColumn = StringArray;
1120
1121 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1122 builder.values_slice().len()
1123 }
1124
1125 fn push(&self, builder: &mut Self::ArrowBuilder) {
1126 builder.append_value(&self.to_string());
1127 }
1128
1129 fn push_null(builder: &mut Self::ArrowBuilder) {
1130 builder.append_null();
1131 }
1132
1133 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1134 *self = column.value(idx).parse().expect("valid Key");
1135 }
1136 }
1137
1138 #[derive(Debug, PartialEq)]
1139 pub(super) struct KeySchema;
1140
1141 impl Schema<Key> for KeySchema {
1142 type ArrowColumn = StringArray;
1143 type Statistics = NoneStats;
1144 type Decoder = SimpleColumnarDecoder<Key>;
1145 type Encoder = SimpleColumnarEncoder<Key>;
1146
1147 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1148 Ok(SimpleColumnarEncoder::default())
1149 }
1150
1151 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1152 Ok(SimpleColumnarDecoder::new(col))
1153 }
1154 }
1155}
1156
1157#[cfg(test)]
1158#[path = "builtin_schema_migration_tests.rs"]
1159mod tests;