mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::Arc;
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
40    RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::SinceHandle;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68/// Builtin schema migrations required to upgrade to the current build version.
69///
70/// Migration steps for old versions must be retained around according to the upgrade policy. For
71/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
72/// can delete all migration steps with versions before `(N-1).0.0`.
73///
74/// Smallest supported version: 0.147.0
75const MIGRATIONS: &[MigrationStep] = &[
76    MigrationStep {
77        version: Version::new(0, 149, 0),
78        object: Object {
79            type_: CatalogItemType::Source,
80            schema: MZ_INTERNAL_SCHEMA,
81            name: "mz_sink_statistics_raw",
82        },
83        mechanism: Mechanism::Replacement,
84    },
85    MigrationStep {
86        version: Version::new(0, 149, 0),
87        object: Object {
88            type_: CatalogItemType::Source,
89            schema: MZ_INTERNAL_SCHEMA,
90            name: "mz_source_statistics_raw",
91        },
92        mechanism: Mechanism::Replacement,
93    },
94    MigrationStep {
95        version: Version::new(0, 159, 0),
96        object: Object {
97            type_: CatalogItemType::Source,
98            schema: MZ_INTERNAL_SCHEMA,
99            name: "mz_cluster_replica_metrics_history",
100        },
101        mechanism: Mechanism::Evolution,
102    },
103    MigrationStep {
104        version: Version::new(0, 160, 0),
105        object: Object {
106            type_: CatalogItemType::Table,
107            schema: MZ_CATALOG_SCHEMA,
108            name: "mz_roles",
109        },
110        mechanism: Mechanism::Replacement,
111    },
112    MigrationStep {
113        version: Version::new(0, 160, 0),
114        object: Object {
115            type_: CatalogItemType::Table,
116            schema: MZ_CATALOG_SCHEMA,
117            name: "mz_sinks",
118        },
119        mechanism: Mechanism::Replacement,
120    },
121];
122
123/// A migration required to upgrade past a specific version.
124#[derive(Clone, Debug)]
125struct MigrationStep {
126    /// The build version that requires this migration.
127    version: Version,
128    /// The object that requires migration.
129    object: Object,
130    /// The migration mechanism to be used.
131    mechanism: Mechanism,
132}
133
134/// The mechanism to use to migrate the schema of a builtin collection.
135#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
136#[allow(dead_code)]
137enum Mechanism {
138    /// Persist schema evolution.
139    ///
140    /// Keeps existing contents but only works for schema changes that are backward compatible
141    /// according to [`backward_compatible`].
142    Evolution,
143    /// Shard replacement.
144    ///
145    /// Works for arbitrary schema changes but loses existing contents.
146    Replacement,
147}
148
149/// The object of a migration.
150///
151/// This has the same information as [`SystemObjectDescription`] but can be constructed in `const`
152/// contexts, like the `MIGRATIONS` list.
153#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct Object {
155    type_: CatalogItemType,
156    schema: &'static str,
157    name: &'static str,
158}
159
160impl From<Object> for SystemObjectDescription {
161    fn from(object: Object) -> Self {
162        SystemObjectDescription {
163            schema_name: object.schema.into(),
164            object_type: object.type_,
165            object_name: object.name.into(),
166        }
167    }
168}
169
170/// The result of a builtin schema migration.
171pub(super) struct MigrationResult {
172    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
173    pub replaced_items: BTreeSet<CatalogItemId>,
174    /// A cleanup action to take once the migration has been made durable.
175    pub cleanup_action: BoxFuture<'static, ()>,
176}
177
178impl Default for MigrationResult {
179    fn default() -> Self {
180        Self {
181            replaced_items: Default::default(),
182            cleanup_action: async {}.boxed(),
183        }
184    }
185}
186
187/// Run builtin schema migrations.
188///
189/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
190/// `BUILTINS` and `MIGRATIONS` lists to initialize the lists of available builtins and required
191/// migrations, respectively.
192pub(super) async fn run(
193    build_info: &BuildInfo,
194    deploy_generation: u64,
195    txn: &mut Transaction<'_>,
196    config: BuiltinItemMigrationConfig,
197) -> Result<MigrationResult, Error> {
198    // Sanity check to ensure we're not touching durable state in read-only mode.
199    assert_eq!(config.read_only, txn.is_savepoint());
200
201    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
202    // migrations if we observe this build info.
203    if *build_info == DUMMY_BUILD_INFO {
204        return Ok(MigrationResult::default());
205    }
206
207    let Some(durable_version) = get_migration_version(txn) else {
208        // New catalog; nothing to do.
209        return Ok(MigrationResult::default());
210    };
211    let build_version = build_info.semver_version();
212
213    let collection_metadata = txn.get_collection_metadata();
214    let system_objects = txn
215        .get_system_object_mappings()
216        .map(|m| {
217            let object = m.description;
218            let global_id = m.unique_identifier.global_id;
219            let shard_id = collection_metadata.get(&global_id).copied();
220            let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
221                panic!("missing builtin {object:?}");
222            };
223            let info = ObjectInfo {
224                global_id,
225                shard_id,
226                builtin,
227                fingerprint: m.unique_identifier.fingerprint,
228            };
229            (object, info)
230        })
231        .collect();
232
233    let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
234
235    let migration = Migration {
236        source_version: durable_version.clone(),
237        target_version: build_version.clone(),
238        deploy_generation,
239        system_objects,
240        migration_shard,
241        config,
242    };
243
244    let result = migration.run(MIGRATIONS).await.map_err(|e| {
245        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
246            last_seen_version: durable_version.to_string(),
247            this_version: build_version.to_string(),
248            cause: e.to_string(),
249        })
250    })?;
251
252    result.apply(txn);
253
254    let replaced_items = txn
255        .get_system_object_mappings()
256        .map(|m| m.unique_identifier)
257        .filter(|ids| result.new_shards.contains_key(&ids.global_id))
258        .map(|ids| ids.catalog_id)
259        .collect();
260
261    Ok(MigrationResult {
262        replaced_items,
263        cleanup_action: result.cleanup_action,
264    })
265}
266
267/// Result produced by `Migration::run`.
268struct MigrationRunResult {
269    new_shards: BTreeMap<GlobalId, ShardId>,
270    new_fingerprints: BTreeMap<SystemObjectDescription, String>,
271    shards_to_finalize: BTreeSet<ShardId>,
272    cleanup_action: BoxFuture<'static, ()>,
273}
274
275impl Default for MigrationRunResult {
276    fn default() -> Self {
277        Self {
278            new_shards: BTreeMap::new(),
279            new_fingerprints: BTreeMap::new(),
280            shards_to_finalize: BTreeSet::new(),
281            cleanup_action: async {}.boxed(),
282        }
283    }
284}
285
286impl MigrationRunResult {
287    /// Apply this migration result to the given transaction.
288    fn apply(&self, txn: &mut Transaction<'_>) {
289        // Update collection metadata.
290        let replaced_ids = self.new_shards.keys().copied().collect();
291        let old_metadata = txn.delete_collection_metadata(replaced_ids);
292        txn.insert_collection_metadata(self.new_shards.clone())
293            .expect("inserting unique shards IDs after deleting existing entries");
294
295        // Register shards for finalization.
296        let mut unfinalized_shards: BTreeSet<_> =
297            old_metadata.into_iter().map(|(_, sid)| sid).collect();
298        unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
299        txn.insert_unfinalized_shards(unfinalized_shards)
300            .expect("cannot fail");
301
302        // Update fingerprints.
303        let mappings = txn
304            .get_system_object_mappings()
305            .filter_map(|m| {
306                let fingerprint = self.new_fingerprints.get(&m.description)?;
307                Some(SystemObjectMapping {
308                    description: m.description,
309                    unique_identifier: SystemObjectUniqueIdentifier {
310                        catalog_id: m.unique_identifier.catalog_id,
311                        global_id: m.unique_identifier.global_id,
312                        fingerprint: fingerprint.clone(),
313                    },
314                })
315            })
316            .collect();
317        txn.set_system_object_mappings(mappings)
318            .expect("filtered existing mappings remain unique");
319    }
320}
321
322/// Information about a system object required to run a `Migration`.
323#[derive(Clone, Debug)]
324struct ObjectInfo {
325    global_id: GlobalId,
326    shard_id: Option<ShardId>,
327    builtin: &'static Builtin<NameReference>,
328    fingerprint: String,
329}
330
331/// Context of a builtin schema migration.
332struct Migration {
333    /// The version we are migrating from.
334    ///
335    /// Same as the build version of the most recent leader process that successfully performed
336    /// migrations.
337    source_version: Version,
338    /// The version we are migration to.
339    ///
340    /// Same as the build version of this process.
341    target_version: Version,
342    /// The deploy generation of this process.
343    deploy_generation: u64,
344    /// Information about all objects in the system.
345    system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
346    /// The ID of the migration shard.
347    migration_shard: ShardId,
348    /// Additional configuration.
349    config: BuiltinItemMigrationConfig,
350}
351
352impl Migration {
353    async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
354        info!(
355            deploy_generation = %self.deploy_generation,
356            "running builtin schema migration: {} -> {}",
357            self.source_version, self.target_version
358        );
359
360        self.validate_migration_steps(steps);
361
362        let (force, plan) = match self.config.force_migration.as_deref() {
363            None => (false, self.plan_migration(steps)),
364            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
365            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
366            Some(other) => panic!("unknown force migration mechanism: {other}"),
367        };
368
369        if self.source_version == self.target_version && !force {
370            info!("skipping migration: already at target version");
371            return Ok(MigrationRunResult::default());
372        } else if self.source_version > self.target_version {
373            bail!("downgrade not supported");
374        }
375
376        // In leader mode, upgrade the version of the migration shard to the target version.
377        // This fences out any readers at lower versions.
378        if !self.config.read_only {
379            self.upgrade_migration_shard_version().await;
380        }
381
382        info!("executing migration plan: {plan:?}");
383
384        self.migrate_evolve(&plan.evolve).await?;
385        let new_shards = self.migrate_replace(&plan.replace).await?;
386
387        let mut migrated_objects = BTreeSet::new();
388        migrated_objects.extend(plan.evolve);
389        migrated_objects.extend(plan.replace);
390
391        let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
392
393        let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
394
395        Ok(MigrationRunResult {
396            new_shards,
397            new_fingerprints,
398            shards_to_finalize,
399            cleanup_action,
400        })
401    }
402
403    /// Sanity check the given migration steps.
404    ///
405    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
406    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
407        for step in steps {
408            assert!(
409                step.version <= self.target_version,
410                "migration step version greater than target version: {} > {}",
411                step.version,
412                self.target_version,
413            );
414
415            let object = SystemObjectDescription::from(step.object.clone());
416
417            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
418            // cause the table to be truncated because the contents are not also stored in the durable
419            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
420            // on startup. The correctness of that pruning relies on there being no other retractions
421            // to `mz_storage_usage_by_shard`.
422            //
423            // TODO: Confirm the above reasoning, it might be outdated?
424            assert_ne!(
425                *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
426                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
427            );
428
429            let Some(object_info) = self.system_objects.get(&object) else {
430                panic!("migration step for non-existent builtin: {object:?}");
431            };
432
433            let builtin = object_info.builtin;
434            use Builtin::*;
435            assert!(
436                matches!(builtin, Table(..) | Source(..) | ContinualTask(..)),
437                "schema migration not supported for builtin: {builtin:?}",
438            );
439        }
440    }
441
442    /// Select for each object to migrate the appropriate migration mechanism.
443    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
444        // Ignore any steps at versions before `source_version`.
445        let steps = steps.iter().filter(|s| s.version > self.source_version);
446
447        // Select a mechanism for each object, according to the requested migrations:
448        //  * If any `Replacement` was requested, use `Replacement`.
449        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
450        let mut by_object = BTreeMap::new();
451        for step in steps {
452            if let Some(entry) = by_object.get_mut(&step.object) {
453                *entry = match (step.mechanism, *entry) {
454                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
455                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
456                        Mechanism::Replacement
457                    }
458                };
459            } else {
460                by_object.insert(step.object.clone(), step.mechanism);
461            }
462        }
463
464        let mut plan = Plan::default();
465        for (object, mechanism) in by_object {
466            match mechanism {
467                Mechanism::Evolution => plan.evolve.push(object.into()),
468                Mechanism::Replacement => plan.replace.push(object.into()),
469            }
470        }
471
472        plan
473    }
474
475    /// Plan a forced migration of all objects using the given mechanism.
476    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
477        let objects = self
478            .system_objects
479            .iter()
480            .filter(|(_, info)| matches!(info.builtin, Builtin::Table(..) | Builtin::Source(..)))
481            .map(|(object, _)| object.clone())
482            .collect();
483
484        let mut plan = Plan::default();
485        match mechanism {
486            Mechanism::Evolution => plan.evolve = objects,
487            Mechanism::Replacement => plan.replace = objects,
488        }
489
490        plan
491    }
492
493    /// Upgrade the migration shard to the target version.
494    async fn upgrade_migration_shard_version(&self) {
495        let persist = &self.config.persist_client;
496        let diagnostics = Diagnostics {
497            shard_name: "builtin_migration".to_string(),
498            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
499        };
500
501        persist
502            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
503                self.migration_shard,
504                diagnostics,
505            )
506            .await
507            .expect("valid usage");
508    }
509
510    /// Migrate the given objects using the `Evolution` mechanism.
511    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
512        for object in objects {
513            self.migrate_evolve_one(object).await?;
514        }
515        Ok(())
516    }
517
518    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
519        let persist = &self.config.persist_client;
520
521        let Some(object_info) = self.system_objects.get(object) else {
522            bail!("missing builtin {object:?}");
523        };
524        let id = object_info.global_id;
525
526        let Some(shard_id) = object_info.shard_id else {
527            // No shard is registered for this builtin. In leader mode, this is fine, we'll
528            // register the shard during bootstrap. In read-only mode, we might be racing with the
529            // leader to register the shard and it's unclear what sort of confusion can arise from
530            // that -- better to bail out in this case.
531            if self.config.read_only {
532                bail!("missing shard ID for builtin {object:?} ({id})");
533            } else {
534                return Ok(());
535            }
536        };
537
538        let target_desc = match object_info.builtin {
539            Builtin::Table(table) => &table.desc,
540            Builtin::Source(source) => &source.desc,
541            Builtin::ContinualTask(ct) => &ct.desc,
542            _ => bail!("not a storage collection: {object:?}"),
543        };
544
545        let diagnostics = Diagnostics {
546            shard_name: id.to_string(),
547            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
548        };
549        let source_schema = persist
550            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
551            .await
552            .expect("valid usage");
553
554        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
555
556        if self.config.read_only {
557            // In read-only mode, only check that the new schema is backward compatible.
558            // We'll register it when/if we restart in leader mode.
559            if let Some((_, source_desc, _)) = &source_schema {
560                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
561                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
562                if backward_compatible(&old, &new).is_none() {
563                    bail!(
564                        "incompatible schema evolution for {object:?}: \
565                         {source_desc:?} -> {target_desc:?}"
566                    );
567                }
568            }
569
570            return Ok(());
571        }
572
573        let (mut schema_id, mut source_desc) = match source_schema {
574            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
575            None => {
576                // If no schema was previously registered, simply try to register the new one. This
577                // might fail due to a concurrent registration, in which case we'll fall back to
578                // `compare_and_evolve_schema`.
579
580                debug!(%id, %shard_id, "no previous schema found; registering initial one");
581                let schema_id = persist
582                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
583                        shard_id,
584                        target_desc,
585                        &UnitSchema,
586                        diagnostics.clone(),
587                    )
588                    .await
589                    .expect("valid usage");
590                if schema_id.is_some() {
591                    return Ok(());
592                }
593
594                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
595                let (schema_id, source_desc, _) = persist
596                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
597                        shard_id,
598                        diagnostics.clone(),
599                    )
600                    .await
601                    .expect("valid usage")
602                    .expect("known to exist");
603
604                (schema_id, source_desc)
605            }
606        };
607
608        loop {
609            // Evolving the schema might fail if another process evolved the schema concurrently,
610            // in which case we need to retry. Most likely the other process evolved the schema to
611            // our own target schema and the second try is a no-op.
612
613            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
614            let result = persist
615                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
616                    shard_id,
617                    schema_id,
618                    target_desc,
619                    &UnitSchema,
620                    diagnostics.clone(),
621                )
622                .await
623                .expect("valid usage");
624
625            match result {
626                CaESchema::Ok(schema_id) => {
627                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
628                    break;
629                }
630                CaESchema::Incompatible => bail!(
631                    "incompatible schema evolution for {object:?}: \
632                     {source_desc:?} -> {target_desc:?}"
633                ),
634                CaESchema::ExpectedMismatch {
635                    schema_id: new_id,
636                    key,
637                    val: UnitSchema,
638                } => {
639                    schema_id = new_id;
640                    source_desc = key;
641                }
642            }
643        }
644
645        Ok(())
646    }
647
648    /// Migrate the given objects using the `Replacement` mechanism.
649    async fn migrate_replace(
650        &self,
651        objects: &[SystemObjectDescription],
652    ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
653        if objects.is_empty() {
654            return Ok(Default::default());
655        }
656
657        let diagnostics = Diagnostics {
658            shard_name: "builtin_migration".to_string(),
659            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
660        };
661        let (mut persist_write, mut persist_read) =
662            self.open_migration_shard(diagnostics.clone()).await;
663
664        let mut ids_to_replace = BTreeSet::new();
665        for object in objects {
666            if let Some(info) = self.system_objects.get(object) {
667                ids_to_replace.insert(info.global_id);
668            } else {
669                bail!("missing id for builtin {object:?}");
670            }
671        }
672
673        info!(?objects, ?ids_to_replace, "migrating by replacement");
674
675        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
676        // This can fail due to writes by concurrent processes, so we need to retry.
677        let replaced_shards = loop {
678            if let Some(shards) = self
679                .try_get_or_insert_replacement_shards(
680                    &ids_to_replace,
681                    &mut persist_write,
682                    &mut persist_read,
683                )
684                .await?
685            {
686                break shards;
687            }
688        };
689
690        Ok(replaced_shards)
691    }
692
693    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
694    /// `target_version` and `deploy_generation`.
695    ///
696    /// This method looks for existing entries in the migration shards and returns those if they
697    /// are present. Otherwise it generates new shard IDs and tries to insert them.
698    ///
699    /// The result of this call is `None` if no existing entries were found and inserting new ones
700    /// failed because of a concurrent write to the migration shard. In this case, the caller is
701    /// expected to retry.
702    async fn try_get_or_insert_replacement_shards(
703        &self,
704        ids_to_replace: &BTreeSet<GlobalId>,
705        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
706        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
707    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
708        let upper = persist_write.fetch_recent_upper().await;
709        let write_ts = *upper.as_option().expect("migration shard not sealed");
710
711        let mut ids_to_replace = ids_to_replace.clone();
712        let mut replaced_shards = BTreeMap::new();
713
714        // Another process might already have done a shard replacement at our version and
715        // generation, in which case we can directly reuse the replacement shards.
716        //
717        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
718        // do. The set of migrations to run depends on both the source and the target version, and
719        // the migration shard is not keyed by source version. The previous writer might have seen
720        // a different source version, if there was a concurrent migration by a leader process.
721        if let Some(read_ts) = write_ts.step_back() {
722            let pred = |key: &migration_shard::Key| {
723                key.build_version == self.target_version
724                    && key.deploy_generation == Some(self.deploy_generation)
725            };
726            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
727                for (key, shard_id) in entries {
728                    let id = GlobalId::System(key.global_id);
729                    if ids_to_replace.remove(&id) {
730                        replaced_shards.insert(id, shard_id);
731                    }
732                }
733
734                debug!(
735                    %read_ts, ?replaced_shards, ?ids_to_replace,
736                    "found existing entries in migration shard",
737                );
738            }
739
740            if ids_to_replace.is_empty() {
741                return Ok(Some(replaced_shards));
742            }
743        }
744
745        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
746        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
747        // and we need to re-check the migration shard contents.
748        let mut updates = Vec::new();
749        for id in ids_to_replace {
750            let shard_id = ShardId::new();
751            replaced_shards.insert(id, shard_id);
752
753            let GlobalId::System(global_id) = id else {
754                bail!("attempt to migrate a non-system collection: {id}");
755            };
756            let key = migration_shard::Key {
757                global_id,
758                build_version: self.target_version.clone(),
759                deploy_generation: Some(self.deploy_generation),
760            };
761            updates.push(((key, shard_id), write_ts, 1));
762        }
763
764        let upper = Antichain::from_elem(write_ts);
765        let new_upper = Antichain::from_elem(write_ts.step_forward());
766        debug!(%write_ts, "attempting insert into migration shard");
767        let result = persist_write
768            .compare_and_append(updates, upper, new_upper)
769            .await
770            .expect("valid usage");
771
772        match result {
773            Ok(()) => {
774                debug!(
775                    %write_ts, ?replaced_shards,
776                    "successfully inserted into migration shard"
777                );
778                Ok(Some(replaced_shards))
779            }
780            Err(_mismatch) => Ok(None),
781        }
782    }
783
784    /// Open writer and reader for the migration shard.
785    async fn open_migration_shard(
786        &self,
787        diagnostics: Diagnostics,
788    ) -> (
789        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
790        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
791    ) {
792        let persist = &self.config.persist_client;
793
794        persist
795            .open(
796                self.migration_shard,
797                Arc::new(migration_shard::KeySchema),
798                Arc::new(ShardIdSchema),
799                diagnostics,
800                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
801            )
802            .await
803            .expect("valid usage")
804    }
805
806    /// Open a [`SinceHandle`] for the migration shard.
807    async fn open_migration_shard_since(
808        &self,
809        diagnostics: Diagnostics,
810    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff, i64> {
811        self.config
812            .persist_client
813            .open_critical_since(
814                self.migration_shard,
815                // TODO: We may need to use a different critical reader
816                // id for this if we want to be able to introspect it via SQL.
817                PersistClient::CONTROLLER_CRITICAL_SINCE,
818                diagnostics.clone(),
819            )
820            .await
821            .expect("valid usage")
822    }
823
824    /// Update the fingerprints for `migrated_items`.
825    ///
826    /// Returns the new fingerprints. Also asserts that the current fingerprints of all other
827    /// system items match their builtin definitions.
828    fn update_fingerprints(
829        &self,
830        migrated_items: &BTreeSet<SystemObjectDescription>,
831    ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
832        let mut new_fingerprints = BTreeMap::new();
833        for (object, object_info) in &self.system_objects {
834            let id = object_info.global_id;
835            let builtin = object_info.builtin;
836
837            let fingerprint = builtin.fingerprint();
838            if fingerprint == object_info.fingerprint {
839                continue; // fingerprint unchanged, nothing to do
840            }
841
842            // Fingerprint mismatch is expected for a migrated item.
843            let migrated = migrated_items.contains(object);
844            // Some builtin types have schemas but no durable state. No migration needed for those.
845            let ephemeral = matches!(
846                builtin,
847                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
848            );
849
850            if migrated || ephemeral {
851                new_fingerprints.insert(object.clone(), fingerprint);
852            } else if builtin.runtime_alterable() {
853                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
854                // sentinel value stored in the catalog.
855                assert_eq!(
856                    object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
857                    "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
858                );
859            } else {
860                panic!(
861                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
862                    fingerprint, object_info.fingerprint,
863                );
864            }
865        }
866
867        Ok(new_fingerprints)
868    }
869
870    /// Perform cleanup of migration state, i.e. the migration shard.
871    ///
872    /// Returns a list of shards to finalize, and a `Future` that must be run after the shard
873    /// finalization has been durably enqueued. The `Future` is used to remove entries from the
874    /// migration shard only after we know the respective shards will be finalized. Removing
875    /// entries immediately would risk leaking the shards.
876    ///
877    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
878    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
879    /// it is safe to assume that any state for versions below the `target_version` is not needed
880    /// anymore and can be cleaned up.
881    ///
882    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
883    /// shard should always be pretty small, so keeping migration state around for longer isn't a
884    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
885    /// to transient failures, instead of retrying.
886    async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
887        let noop_action = async {}.boxed();
888        let noop_result = (BTreeSet::new(), noop_action);
889
890        if self.config.read_only {
891            return Ok(noop_result);
892        }
893
894        let diagnostics = Diagnostics {
895            shard_name: "builtin_migration".to_string(),
896            handle_purpose: "builtin schema migration cleanup".into(),
897        };
898        let (mut persist_write, mut persist_read) =
899            self.open_migration_shard(diagnostics.clone()).await;
900        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
901
902        let upper = persist_write.fetch_recent_upper().await.clone();
903        let write_ts = *upper.as_option().expect("migration shard not sealed");
904        let Some(read_ts) = write_ts.step_back() else {
905            return Ok(noop_result);
906        };
907
908        // Collect old entries to remove.
909        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
910        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
911        else {
912            return Ok(noop_result);
913        };
914
915        debug!(
916            ?stale_entries,
917            "cleaning migration shard up to version {}", self.target_version,
918        );
919
920        let current_shards: BTreeMap<_, _> = self
921            .system_objects
922            .values()
923            .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
924            .collect();
925
926        let mut shards_to_finalize = BTreeSet::new();
927        let mut retractions = Vec::new();
928        for (key, shard_id) in stale_entries {
929            // The migration shard contains both shards created during aborted upgrades and shards
930            // created during successful upgrades. The latter may still be in use, so we have to
931            // check and only finalize those that aren't anymore.
932            let gid = GlobalId::System(key.global_id);
933            if current_shards.get(&gid) != Some(&shard_id) {
934                shards_to_finalize.insert(shard_id);
935            }
936
937            retractions.push(((key, shard_id), write_ts, -1));
938        }
939
940        let cleanup_action = async move {
941            if !retractions.is_empty() {
942                let new_upper = Antichain::from_elem(write_ts.step_forward());
943                let result = persist_write
944                    .compare_and_append(retractions, upper, new_upper)
945                    .await
946                    .expect("valid usage");
947                match result {
948                    Ok(()) => debug!("cleaned up migration shard"),
949                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
950                }
951            }
952        }
953        .boxed();
954
955        // Downgrade the since, to enable some compaction.
956        let o = *persist_since.opaque();
957        let new_since = Antichain::from_elem(read_ts);
958        let result = persist_since
959            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
960            .await;
961        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
962
963        Ok((shards_to_finalize, cleanup_action))
964    }
965}
966
967/// Read the migration shard at the given timestamp, returning all entries that match the given
968/// predicate.
969///
970/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
971/// `read_ts`.
972async fn read_migration_shard<P>(
973    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
974    read_ts: Timestamp,
975    predicate: P,
976) -> Option<Vec<(migration_shard::Key, ShardId)>>
977where
978    P: for<'a> Fn(&migration_shard::Key) -> bool,
979{
980    let as_of = Antichain::from_elem(read_ts);
981    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
982
983    assert!(
984        updates.iter().all(|(_, _, diff)| *diff == 1),
985        "migration shard contains invalid diffs: {updates:?}",
986    );
987
988    let entries: Vec<_> = updates
989        .into_iter()
990        .filter_map(|(data, _, _)| {
991            if let (Ok(key), Ok(val)) = data {
992                Some((key, val))
993            } else {
994                // If we can't decode the data, it has likely been written by a newer version, so
995                // we ignore it.
996                info!("skipping unreadable migration shard entry: {data:?}");
997                None
998            }
999        })
1000        .filter(move |(key, _)| predicate(key))
1001        .collect();
1002
1003    (!entries.is_empty()).then_some(entries)
1004}
1005
1006/// A plan to migrate between two versions.
1007#[derive(Debug, Default)]
1008struct Plan {
1009    /// Objects to migrate using the `Evolution` mechanism.
1010    evolve: Vec<SystemObjectDescription>,
1011    /// Objects to migrate using the `Replacement` mechanism.
1012    replace: Vec<SystemObjectDescription>,
1013}
1014
1015/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
1016mod migration_shard {
1017    use std::fmt;
1018    use std::str::FromStr;
1019
1020    use arrow::array::{StringArray, StringBuilder};
1021    use bytes::{BufMut, Bytes};
1022    use mz_persist_types::Codec;
1023    use mz_persist_types::codec_impls::{
1024        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1025    };
1026    use mz_persist_types::columnar::Schema;
1027    use mz_persist_types::stats::NoneStats;
1028    use semver::Version;
1029    use serde::{Deserialize, Serialize};
1030
1031    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1032    pub(super) struct Key {
1033        pub(super) global_id: u64,
1034        pub(super) build_version: Version,
1035        // Versions < 26.0 didn't include the deploy generation. As long as we still might
1036        // encounter migration shard entries that don't have it, we need to keep this an `Option`
1037        // and keep supporting both key formats.
1038        pub(super) deploy_generation: Option<u64>,
1039    }
1040
1041    impl fmt::Display for Key {
1042        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1043            if self.deploy_generation.is_some() {
1044                // current format
1045                let s = serde_json::to_string(self).expect("JSON serializable");
1046                f.write_str(&s)
1047            } else {
1048                // pre-26.0 format
1049                write!(f, "{}-{}", self.global_id, self.build_version)
1050            }
1051        }
1052    }
1053
1054    impl FromStr for Key {
1055        type Err = String;
1056
1057        fn from_str(s: &str) -> Result<Self, String> {
1058            // current format
1059            if let Ok(key) = serde_json::from_str(s) {
1060                return Ok(key);
1061            };
1062
1063            // pre-26.0 format
1064            let parts: Vec<_> = s.splitn(2, '-').collect();
1065            let &[global_id, build_version] = parts.as_slice() else {
1066                return Err(format!("invalid Key '{s}'"));
1067            };
1068            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1069            let build_version = build_version
1070                .parse::<Version>()
1071                .map_err(|e| e.to_string())?;
1072            Ok(Key {
1073                global_id,
1074                build_version,
1075                deploy_generation: None,
1076            })
1077        }
1078    }
1079
1080    impl Default for Key {
1081        fn default() -> Self {
1082            Self {
1083                global_id: Default::default(),
1084                build_version: Version::new(0, 0, 0),
1085                deploy_generation: Some(0),
1086            }
1087        }
1088    }
1089
1090    impl Codec for Key {
1091        type Schema = KeySchema;
1092        type Storage = ();
1093
1094        fn codec_name() -> String {
1095            "TableKey".into()
1096        }
1097
1098        fn encode<B: BufMut>(&self, buf: &mut B) {
1099            buf.put(self.to_string().as_bytes())
1100        }
1101
1102        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1103            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1104            s.parse()
1105        }
1106
1107        fn encode_schema(_schema: &KeySchema) -> Bytes {
1108            Bytes::new()
1109        }
1110
1111        fn decode_schema(buf: &Bytes) -> Self::Schema {
1112            assert_eq!(*buf, Bytes::new());
1113            KeySchema
1114        }
1115    }
1116
1117    impl SimpleColumnarData for Key {
1118        type ArrowBuilder = StringBuilder;
1119        type ArrowColumn = StringArray;
1120
1121        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1122            builder.values_slice().len()
1123        }
1124
1125        fn push(&self, builder: &mut Self::ArrowBuilder) {
1126            builder.append_value(&self.to_string());
1127        }
1128
1129        fn push_null(builder: &mut Self::ArrowBuilder) {
1130            builder.append_null();
1131        }
1132
1133        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1134            *self = column.value(idx).parse().expect("valid Key");
1135        }
1136    }
1137
1138    #[derive(Debug, PartialEq)]
1139    pub(super) struct KeySchema;
1140
1141    impl Schema<Key> for KeySchema {
1142        type ArrowColumn = StringArray;
1143        type Statistics = NoneStats;
1144        type Decoder = SimpleColumnarDecoder<Key>;
1145        type Encoder = SimpleColumnarEncoder<Key>;
1146
1147        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1148            Ok(SimpleColumnarEncoder::default())
1149        }
1150
1151        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1152            Ok(SimpleColumnarDecoder::new(col))
1153        }
1154    }
1155}
1156
1157#[cfg(test)]
1158#[path = "builtin_schema_migration_tests.rs"]
1159mod tests;