1use std::collections::{BTreeMap, BTreeSet};
32use std::sync::Arc;
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39 BUILTINS_STATIC, Builtin, Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
40 RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::SinceHandle;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68const MIGRATIONS: &[MigrationStep] = &[
76 MigrationStep {
77 version: Version::new(0, 149, 0),
78 object: Object {
79 type_: CatalogItemType::Source,
80 schema: MZ_INTERNAL_SCHEMA,
81 name: "mz_sink_statistics_raw",
82 },
83 mechanism: Mechanism::Replacement,
84 },
85 MigrationStep {
86 version: Version::new(0, 149, 0),
87 object: Object {
88 type_: CatalogItemType::Source,
89 schema: MZ_INTERNAL_SCHEMA,
90 name: "mz_source_statistics_raw",
91 },
92 mechanism: Mechanism::Replacement,
93 },
94 MigrationStep {
95 version: Version::new(0, 159, 0),
96 object: Object {
97 type_: CatalogItemType::Source,
98 schema: MZ_INTERNAL_SCHEMA,
99 name: "mz_cluster_replica_metrics_history",
100 },
101 mechanism: Mechanism::Evolution,
102 },
103 MigrationStep {
104 version: Version::new(0, 160, 0),
105 object: Object {
106 type_: CatalogItemType::Table,
107 schema: MZ_CATALOG_SCHEMA,
108 name: "mz_roles",
109 },
110 mechanism: Mechanism::Replacement,
111 },
112 MigrationStep {
113 version: Version::new(0, 160, 0),
114 object: Object {
115 type_: CatalogItemType::Table,
116 schema: MZ_CATALOG_SCHEMA,
117 name: "mz_sinks",
118 },
119 mechanism: Mechanism::Replacement,
120 },
121];
122
123#[derive(Clone, Debug)]
125struct MigrationStep {
126 version: Version,
128 object: Object,
130 mechanism: Mechanism,
132}
133
134#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
136#[allow(dead_code)]
137enum Mechanism {
138 Evolution,
143 Replacement,
147}
148
149#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct Object {
155 type_: CatalogItemType,
156 schema: &'static str,
157 name: &'static str,
158}
159
160impl From<Object> for SystemObjectDescription {
161 fn from(object: Object) -> Self {
162 SystemObjectDescription {
163 schema_name: object.schema.into(),
164 object_type: object.type_,
165 object_name: object.name.into(),
166 }
167 }
168}
169
170pub(super) struct MigrationResult {
172 pub replaced_items: BTreeSet<CatalogItemId>,
174 pub cleanup_action: BoxFuture<'static, ()>,
176}
177
178impl Default for MigrationResult {
179 fn default() -> Self {
180 Self {
181 replaced_items: Default::default(),
182 cleanup_action: async {}.boxed(),
183 }
184 }
185}
186
187pub(super) async fn run(
193 build_info: &BuildInfo,
194 deploy_generation: u64,
195 txn: &mut Transaction<'_>,
196 config: BuiltinItemMigrationConfig,
197) -> Result<MigrationResult, Error> {
198 assert_eq!(config.read_only, txn.is_savepoint());
200
201 if *build_info == DUMMY_BUILD_INFO {
204 return Ok(MigrationResult::default());
205 }
206
207 let Some(durable_version) = get_migration_version(txn) else {
208 return Ok(MigrationResult::default());
210 };
211 let build_version = build_info.semver_version();
212
213 let builtins = BUILTINS_STATIC
214 .iter()
215 .map(|builtin| {
216 let object = SystemObjectDescription {
217 schema_name: builtin.schema().to_string(),
218 object_type: builtin.catalog_item_type(),
219 object_name: builtin.name().to_string(),
220 };
221 (object, builtin)
222 })
223 .collect();
224
225 let migration = Migration::new(
226 durable_version.clone(),
227 build_version.clone(),
228 deploy_generation,
229 txn,
230 builtins,
231 config,
232 );
233
234 let result = migration.run(MIGRATIONS).await.map_err(|e| {
235 Error::new(ErrorKind::FailedBuiltinSchemaMigration {
236 last_seen_version: durable_version.to_string(),
237 this_version: build_version.to_string(),
238 cause: e.to_string(),
239 })
240 })?;
241
242 Ok(result)
243}
244
245struct Migration<'a, 'b> {
247 source_version: Version,
252 target_version: Version,
256 deploy_generation: u64,
257 txn: &'a mut Transaction<'b>,
258 builtins: BTreeMap<SystemObjectDescription, &'static Builtin<NameReference>>,
259 object_ids: BTreeMap<SystemObjectDescription, SystemObjectUniqueIdentifier>,
260 config: BuiltinItemMigrationConfig,
261}
262
263impl<'a, 'b> Migration<'a, 'b> {
264 fn new(
265 source_version: Version,
266 target_version: Version,
267 deploy_generation: u64,
268 txn: &'a mut Transaction<'b>,
269 builtins: BTreeMap<SystemObjectDescription, &'static Builtin<NameReference>>,
270 config: BuiltinItemMigrationConfig,
271 ) -> Self {
272 let object_ids = txn
273 .get_system_object_mappings()
274 .map(|m| (m.description, m.unique_identifier))
275 .collect();
276
277 Self {
278 source_version,
279 target_version,
280 deploy_generation,
281 txn,
282 builtins,
283 object_ids,
284 config,
285 }
286 }
287
288 async fn run(mut self, steps: &[MigrationStep]) -> anyhow::Result<MigrationResult> {
289 info!(
290 deploy_generation = %self.deploy_generation,
291 "running builtin schema migration: {} -> {}",
292 self.source_version, self.target_version
293 );
294
295 self.validate_migration_steps(steps);
296
297 let (force, plan) = match self.config.force_migration.as_deref() {
298 None => (false, self.plan_migration(steps)),
299 Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
300 Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
301 Some(other) => panic!("unknown force migration mechanism: {other}"),
302 };
303
304 if self.source_version == self.target_version && !force {
305 info!("skipping migration: already at target version");
306 return Ok(Default::default());
307 } else if self.source_version > self.target_version {
308 bail!("downgrade not supported");
309 }
310
311 if !self.config.read_only {
314 self.upgrade_migration_shard_version().await;
315 }
316
317 info!("executing migration plan: {plan:?}");
318
319 self.migrate_evolve(&plan.evolve).await?;
320 self.migrate_replace(&plan.replace).await?;
321
322 let mut migrated_items = BTreeSet::new();
323 let mut replaced_items = BTreeSet::new();
324 for object in &plan.evolve {
325 let id = self.object_ids[object].catalog_id;
326 migrated_items.insert(id);
327 }
328 for object in &plan.replace {
329 let id = self.object_ids[object].catalog_id;
330 migrated_items.insert(id);
331 replaced_items.insert(id);
332 }
333
334 self.update_fingerprints(&migrated_items)?;
335
336 let cleanup_action = self.cleanup().await?;
337
338 Ok(MigrationResult {
339 replaced_items,
340 cleanup_action,
341 })
342 }
343
344 fn validate_migration_steps(&self, steps: &[MigrationStep]) {
348 for step in steps {
349 assert!(
350 step.version <= self.target_version,
351 "migration step version greater than target version: {} > {}",
352 step.version,
353 self.target_version,
354 );
355
356 let object = SystemObjectDescription::from(step.object.clone());
357
358 assert_ne!(
366 *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
367 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
368 );
369
370 let Some(builtin) = self.builtins.get(&object) else {
371 panic!("migration step for non-existent builtin: {object:?}");
372 };
373
374 use Builtin::*;
375 assert!(
376 matches!(builtin, Table(..) | Source(..) | ContinualTask(..)),
377 "schema migration not supported for builtin: {builtin:?}",
378 );
379 }
380 }
381
382 fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
384 let steps = steps.iter().filter(|s| s.version > self.source_version);
386
387 let mut by_object = BTreeMap::new();
391 for step in steps {
392 if let Some(entry) = by_object.get_mut(&step.object) {
393 *entry = match (step.mechanism, *entry) {
394 (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
395 (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
396 Mechanism::Replacement
397 }
398 };
399 } else {
400 by_object.insert(step.object.clone(), step.mechanism);
401 }
402 }
403
404 let mut plan = Plan::default();
405 for (object, mechanism) in by_object {
406 match mechanism {
407 Mechanism::Evolution => plan.evolve.push(object.into()),
408 Mechanism::Replacement => plan.replace.push(object.into()),
409 }
410 }
411
412 plan
413 }
414
415 fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
417 let objects = self
418 .builtins
419 .iter()
420 .filter(|(_, builtin)| matches!(builtin, Builtin::Table(..) | Builtin::Source(..)))
421 .map(|(object, _)| object.clone())
422 .collect();
423
424 let mut plan = Plan::default();
425 match mechanism {
426 Mechanism::Evolution => plan.evolve = objects,
427 Mechanism::Replacement => plan.replace = objects,
428 }
429
430 plan
431 }
432
433 async fn upgrade_migration_shard_version(&self) {
435 let persist = &self.config.persist_client;
436 let shard_id = self.txn.get_builtin_migration_shard().expect("must exist");
437 let diagnostics = Diagnostics {
438 shard_name: "builtin_migration".to_string(),
439 handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
440 };
441
442 persist
443 .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
444 shard_id,
445 diagnostics,
446 )
447 .await
448 .expect("valid usage");
449 }
450
451 async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
453 for object in objects {
454 self.migrate_evolve_one(object).await?;
455 }
456 Ok(())
457 }
458
459 async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
460 let collection_metadata = self.txn.get_collection_metadata();
461 let persist = &self.config.persist_client;
462
463 let Some(builtin) = self.builtins.get(object) else {
464 bail!("missing builtin {object:?}");
465 };
466 let Some(id) = self.object_ids.get(object).map(|i| i.global_id) else {
467 bail!("missing id for builtin {object:?}");
468 };
469 let Some(&shard_id) = collection_metadata.get(&id) else {
470 if self.config.read_only {
475 bail!("missing collection metadata for builtin {object:?} ({id})");
476 } else {
477 return Ok(());
478 }
479 };
480
481 let target_desc = match builtin {
482 Builtin::Table(table) => &table.desc,
483 Builtin::Source(source) => &source.desc,
484 Builtin::ContinualTask(ct) => &ct.desc,
485 _ => bail!("not a storage collection: {builtin:?}"),
486 };
487
488 let diagnostics = Diagnostics {
489 shard_name: id.to_string(),
490 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
491 };
492 let source_schema = persist
493 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
494 .await
495 .expect("valid usage");
496
497 info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
498
499 if self.config.read_only {
500 if let Some((_, source_desc, _)) = &source_schema {
503 let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
504 let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
505 if backward_compatible(&old, &new).is_none() {
506 bail!(
507 "incompatible schema evolution for {object:?}: \
508 {source_desc:?} -> {target_desc:?}"
509 );
510 }
511 }
512
513 return Ok(());
514 }
515
516 let (mut schema_id, mut source_desc) = match source_schema {
517 Some((schema_id, source_desc, _)) => (schema_id, source_desc),
518 None => {
519 debug!(%id, %shard_id, "no previous schema found; registering initial one");
524 let schema_id = persist
525 .register_schema::<SourceData, (), Timestamp, StorageDiff>(
526 shard_id,
527 target_desc,
528 &UnitSchema,
529 diagnostics.clone(),
530 )
531 .await
532 .expect("valid usage");
533 if schema_id.is_some() {
534 return Ok(());
535 }
536
537 debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
538 let (schema_id, source_desc, _) = persist
539 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
540 shard_id,
541 diagnostics.clone(),
542 )
543 .await
544 .expect("valid usage")
545 .expect("known to exist");
546
547 (schema_id, source_desc)
548 }
549 };
550
551 loop {
552 debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
557 let result = persist
558 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
559 shard_id,
560 schema_id,
561 target_desc,
562 &UnitSchema,
563 diagnostics.clone(),
564 )
565 .await
566 .expect("valid usage");
567
568 match result {
569 CaESchema::Ok(schema_id) => {
570 debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
571 break;
572 }
573 CaESchema::Incompatible => bail!(
574 "incompatible schema evolution for {object:?}: \
575 {source_desc:?} -> {target_desc:?}"
576 ),
577 CaESchema::ExpectedMismatch {
578 schema_id: new_id,
579 key,
580 val: UnitSchema,
581 } => {
582 schema_id = new_id;
583 source_desc = key;
584 }
585 }
586 }
587
588 Ok(())
589 }
590
591 async fn migrate_replace(&mut self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
593 if objects.is_empty() {
594 return Ok(Default::default());
595 }
596
597 let diagnostics = Diagnostics {
598 shard_name: "builtin_migration".to_string(),
599 handle_purpose: format!("builtin schema migration @ {}", self.target_version),
600 };
601 let (mut persist_write, mut persist_read) =
602 self.open_migration_shard(diagnostics.clone()).await;
603
604 let mut ids_to_replace = BTreeSet::new();
605 for object in objects {
606 if let Some(ids) = self.object_ids.get(object) {
607 ids_to_replace.insert(ids.global_id);
608 } else {
609 bail!("missing id for builtin {object:?}");
610 }
611 }
612
613 info!(?objects, ?ids_to_replace, "migrating by replacement");
614
615 let replaced_shards = loop {
618 if let Some(shards) = self
619 .try_get_or_insert_replacement_shards(
620 &ids_to_replace,
621 &mut persist_write,
622 &mut persist_read,
623 )
624 .await?
625 {
626 break shards;
627 }
628 };
629
630 let old = self.txn.delete_collection_metadata(ids_to_replace);
633 let old_shards = old.into_iter().map(|(_, shard_id)| shard_id).collect();
634 self.txn.insert_unfinalized_shards(old_shards)?;
635 self.txn.insert_collection_metadata(replaced_shards)?;
636
637 Ok(())
638 }
639
640 async fn try_get_or_insert_replacement_shards(
650 &self,
651 ids_to_replace: &BTreeSet<GlobalId>,
652 persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
653 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
654 ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
655 let upper = persist_write.fetch_recent_upper().await;
656 let write_ts = *upper.as_option().expect("migration shard not sealed");
657
658 let mut ids_to_replace = ids_to_replace.clone();
659 let mut replaced_shards = BTreeMap::new();
660
661 if let Some(read_ts) = write_ts.step_back() {
669 let pred = |key: &migration_shard::Key| {
670 key.build_version == self.target_version
671 && key.deploy_generation == Some(self.deploy_generation)
672 };
673 if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
674 for (key, shard_id) in entries {
675 let id = GlobalId::System(key.global_id);
676 if ids_to_replace.remove(&id) {
677 replaced_shards.insert(id, shard_id);
678 }
679 }
680
681 debug!(
682 %read_ts, ?replaced_shards, ?ids_to_replace,
683 "found existing entries in migration shard",
684 );
685 }
686
687 if ids_to_replace.is_empty() {
688 return Ok(Some(replaced_shards));
689 }
690 }
691
692 let mut updates = Vec::new();
696 for id in ids_to_replace {
697 let shard_id = ShardId::new();
698 replaced_shards.insert(id, shard_id);
699
700 let GlobalId::System(global_id) = id else {
701 bail!("attempt to migrate a non-system collection: {id}");
702 };
703 let key = migration_shard::Key {
704 global_id,
705 build_version: self.target_version.clone(),
706 deploy_generation: Some(self.deploy_generation),
707 };
708 updates.push(((key, shard_id), write_ts, 1));
709 }
710
711 let upper = Antichain::from_elem(write_ts);
712 let new_upper = Antichain::from_elem(write_ts.step_forward());
713 debug!(%write_ts, "attempting insert into migration shard");
714 let result = persist_write
715 .compare_and_append(updates, upper, new_upper)
716 .await
717 .expect("valid usage");
718
719 match result {
720 Ok(()) => {
721 debug!(
722 %write_ts, ?replaced_shards,
723 "successfully inserted into migration shard"
724 );
725 Ok(Some(replaced_shards))
726 }
727 Err(_mismatch) => Ok(None),
728 }
729 }
730
731 async fn open_migration_shard(
733 &self,
734 diagnostics: Diagnostics,
735 ) -> (
736 WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
737 ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
738 ) {
739 let persist = &self.config.persist_client;
740 let shard_id = self.txn.get_builtin_migration_shard().expect("must exist");
741
742 persist
743 .open(
744 shard_id,
745 Arc::new(migration_shard::KeySchema),
746 Arc::new(ShardIdSchema),
747 diagnostics,
748 USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
749 )
750 .await
751 .expect("valid usage")
752 }
753
754 async fn open_migration_shard_since(
756 &self,
757 diagnostics: Diagnostics,
758 ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff, i64> {
759 let persist = &self.config.persist_client;
760 let shard_id = self.txn.get_builtin_migration_shard().expect("must exist");
761
762 persist
763 .open_critical_since(
764 shard_id,
765 PersistClient::CONTROLLER_CRITICAL_SINCE,
768 diagnostics.clone(),
769 )
770 .await
771 .expect("valid usage")
772 }
773
774 fn update_fingerprints(
779 &mut self,
780 migrated_items: &BTreeSet<CatalogItemId>,
781 ) -> anyhow::Result<()> {
782 let mut updates = BTreeMap::new();
783 for (object, ids) in &self.object_ids {
784 let Some(builtin) = self.builtins.get(object) else {
785 bail!("missing builtin {object:?}");
786 };
787
788 let id = ids.catalog_id;
789 let fingerprint = builtin.fingerprint();
790 if fingerprint == ids.fingerprint {
791 continue; }
793
794 let migrated = migrated_items.contains(&id);
796 let ephemeral = matches!(
798 builtin,
799 Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
800 );
801
802 if migrated || ephemeral {
803 let new_mapping = SystemObjectMapping {
804 description: object.clone(),
805 unique_identifier: SystemObjectUniqueIdentifier {
806 catalog_id: ids.catalog_id,
807 global_id: ids.global_id,
808 fingerprint,
809 },
810 };
811 updates.insert(id, new_mapping);
812 } else if builtin.runtime_alterable() {
813 assert_eq!(
816 ids.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
817 "fingerprint mismatch for runtime-alterable builtin {builtin:?} ({id})",
818 );
819 } else {
820 panic!(
821 "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
822 fingerprint, ids.fingerprint,
823 );
824 }
825 }
826
827 self.txn.update_system_object_mappings(updates)?;
828
829 Ok(())
830 }
831
832 async fn cleanup(&mut self) -> anyhow::Result<BoxFuture<'static, ()>> {
849 let noop_action = async {}.boxed();
850
851 if self.config.read_only {
852 return Ok(noop_action);
853 }
854
855 let collection_metadata = self.txn.get_collection_metadata();
856 let diagnostics = Diagnostics {
857 shard_name: "builtin_migration".to_string(),
858 handle_purpose: "builtin schema migration cleanup".into(),
859 };
860 let (mut persist_write, mut persist_read) =
861 self.open_migration_shard(diagnostics.clone()).await;
862 let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
863
864 let upper = persist_write.fetch_recent_upper().await.clone();
865 let write_ts = *upper.as_option().expect("migration shard not sealed");
866 let Some(read_ts) = write_ts.step_back() else {
867 return Ok(noop_action);
868 };
869
870 let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
872 let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
873 else {
874 return Ok(noop_action);
875 };
876
877 debug!(
878 ?stale_entries,
879 "cleaning migration shard up to version {}", self.target_version,
880 );
881
882 let mut unfinalized_shards = BTreeSet::new();
883 let mut retractions = Vec::new();
884 for (key, shard_id) in stale_entries {
885 let gid = GlobalId::System(key.global_id);
889 if collection_metadata.get(&gid) != Some(&shard_id) {
890 unfinalized_shards.insert(shard_id);
891 }
892
893 retractions.push(((key, shard_id), write_ts, -1));
894 }
895
896 self.txn.insert_unfinalized_shards(unfinalized_shards)?;
898 let cleanup_action = async move {
899 if !retractions.is_empty() {
900 let new_upper = Antichain::from_elem(write_ts.step_forward());
901 let result = persist_write
902 .compare_and_append(retractions, upper, new_upper)
903 .await
904 .expect("valid usage");
905 match result {
906 Ok(()) => debug!("cleaned up migration shard"),
907 Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
908 }
909 }
910 }
911 .boxed();
912
913 let o = *persist_since.opaque();
915 let new_since = Antichain::from_elem(read_ts);
916 let result = persist_since
917 .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
918 .await;
919 soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
920
921 Ok(cleanup_action)
922 }
923}
924
925async fn read_migration_shard<P>(
931 persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
932 read_ts: Timestamp,
933 predicate: P,
934) -> Option<Vec<(migration_shard::Key, ShardId)>>
935where
936 P: for<'a> Fn(&migration_shard::Key) -> bool,
937{
938 let as_of = Antichain::from_elem(read_ts);
939 let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
940
941 assert!(
942 updates.iter().all(|(_, _, diff)| *diff == 1),
943 "migration shard contains invalid diffs: {updates:?}",
944 );
945
946 let entries: Vec<_> = updates
947 .into_iter()
948 .filter_map(|(data, _, _)| {
949 if let (Ok(key), Ok(val)) = data {
950 Some((key, val))
951 } else {
952 info!("skipping unreadable migration shard entry: {data:?}");
955 None
956 }
957 })
958 .filter(move |(key, _)| predicate(key))
959 .collect();
960
961 (!entries.is_empty()).then_some(entries)
962}
963
964#[derive(Debug, Default)]
966struct Plan {
967 evolve: Vec<SystemObjectDescription>,
969 replace: Vec<SystemObjectDescription>,
971}
972
973mod migration_shard {
975 use std::fmt;
976 use std::str::FromStr;
977
978 use arrow::array::{StringArray, StringBuilder};
979 use bytes::{BufMut, Bytes};
980 use mz_persist_types::Codec;
981 use mz_persist_types::codec_impls::{
982 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
983 };
984 use mz_persist_types::columnar::Schema;
985 use mz_persist_types::stats::NoneStats;
986 use semver::Version;
987 use serde::{Deserialize, Serialize};
988
989 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
990 pub(super) struct Key {
991 pub(super) global_id: u64,
992 pub(super) build_version: Version,
993 pub(super) deploy_generation: Option<u64>,
997 }
998
999 impl fmt::Display for Key {
1000 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1001 if self.deploy_generation.is_some() {
1002 let s = serde_json::to_string(self).expect("JSON serializable");
1004 f.write_str(&s)
1005 } else {
1006 write!(f, "{}-{}", self.global_id, self.build_version)
1008 }
1009 }
1010 }
1011
1012 impl FromStr for Key {
1013 type Err = String;
1014
1015 fn from_str(s: &str) -> Result<Self, String> {
1016 if let Ok(key) = serde_json::from_str(s) {
1018 return Ok(key);
1019 };
1020
1021 let parts: Vec<_> = s.splitn(2, '-').collect();
1023 let &[global_id, build_version] = parts.as_slice() else {
1024 return Err(format!("invalid Key '{s}'"));
1025 };
1026 let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1027 let build_version = build_version
1028 .parse::<Version>()
1029 .map_err(|e| e.to_string())?;
1030 Ok(Key {
1031 global_id,
1032 build_version,
1033 deploy_generation: None,
1034 })
1035 }
1036 }
1037
1038 impl Default for Key {
1039 fn default() -> Self {
1040 Self {
1041 global_id: Default::default(),
1042 build_version: Version::new(0, 0, 0),
1043 deploy_generation: Some(0),
1044 }
1045 }
1046 }
1047
1048 impl Codec for Key {
1049 type Schema = KeySchema;
1050 type Storage = ();
1051
1052 fn codec_name() -> String {
1053 "TableKey".into()
1054 }
1055
1056 fn encode<B: BufMut>(&self, buf: &mut B) {
1057 buf.put(self.to_string().as_bytes())
1058 }
1059
1060 fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1061 let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1062 s.parse()
1063 }
1064
1065 fn encode_schema(_schema: &KeySchema) -> Bytes {
1066 Bytes::new()
1067 }
1068
1069 fn decode_schema(buf: &Bytes) -> Self::Schema {
1070 assert_eq!(*buf, Bytes::new());
1071 KeySchema
1072 }
1073 }
1074
1075 impl SimpleColumnarData for Key {
1076 type ArrowBuilder = StringBuilder;
1077 type ArrowColumn = StringArray;
1078
1079 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1080 builder.values_slice().len()
1081 }
1082
1083 fn push(&self, builder: &mut Self::ArrowBuilder) {
1084 builder.append_value(&self.to_string());
1085 }
1086
1087 fn push_null(builder: &mut Self::ArrowBuilder) {
1088 builder.append_null();
1089 }
1090
1091 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1092 *self = column.value(idx).parse().expect("valid Key");
1093 }
1094 }
1095
1096 #[derive(Debug, PartialEq)]
1097 pub(super) struct KeySchema;
1098
1099 impl Schema<Key> for KeySchema {
1100 type ArrowColumn = StringArray;
1101 type Statistics = NoneStats;
1102 type Decoder = SimpleColumnarDecoder<Key>;
1103 type Encoder = SimpleColumnarEncoder<Key>;
1104
1105 fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1106 Ok(SimpleColumnarEncoder::default())
1107 }
1108
1109 fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1110 Ok(SimpleColumnarDecoder::new(col))
1111 }
1112 }
1113}