Skip to main content

mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40    MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::{Opaque, SinceHandle};
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68/// Builtin schema migrations required to upgrade to the current build version.
69///
70/// Migration steps for old versions must be retained around according to the upgrade policy. For
71/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
72/// can delete all migration steps with versions before `(N-1).0.0`.
73///
74/// Smallest supported version: 0.147.0
75static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
76    vec![
77        MigrationStep::replacement(
78            "0.149.0",
79            CatalogItemType::Source,
80            MZ_INTERNAL_SCHEMA,
81            "mz_sink_statistics_raw",
82        ),
83        MigrationStep::replacement(
84            "0.149.0",
85            CatalogItemType::Source,
86            MZ_INTERNAL_SCHEMA,
87            "mz_source_statistics_raw",
88        ),
89        MigrationStep::evolution(
90            "0.159.0",
91            CatalogItemType::Source,
92            MZ_INTERNAL_SCHEMA,
93            "mz_cluster_replica_metrics_history",
94        ),
95        MigrationStep::replacement(
96            "0.160.0",
97            CatalogItemType::Table,
98            MZ_CATALOG_SCHEMA,
99            "mz_roles",
100        ),
101        MigrationStep::replacement(
102            "0.160.0",
103            CatalogItemType::Table,
104            MZ_CATALOG_SCHEMA,
105            "mz_sinks",
106        ),
107        MigrationStep::replacement(
108            "26.18.0-dev.0",
109            CatalogItemType::MaterializedView,
110            MZ_CATALOG_SCHEMA,
111            "mz_databases",
112        ),
113        MigrationStep::replacement(
114            "26.19.0-dev.0",
115            CatalogItemType::MaterializedView,
116            MZ_CATALOG_SCHEMA,
117            "mz_schemas",
118        ),
119        MigrationStep::replacement(
120            "26.19.0-dev.0",
121            CatalogItemType::MaterializedView,
122            MZ_CATALOG_SCHEMA,
123            "mz_role_members",
124        ),
125        MigrationStep::replacement(
126            "26.19.0-dev.0",
127            CatalogItemType::MaterializedView,
128            MZ_INTERNAL_SCHEMA,
129            "mz_network_policies",
130        ),
131        MigrationStep::replacement(
132            "26.19.0-dev.0",
133            CatalogItemType::MaterializedView,
134            MZ_INTERNAL_SCHEMA,
135            "mz_network_policy_rules",
136        ),
137        MigrationStep::replacement(
138            "26.19.0-dev.0",
139            CatalogItemType::MaterializedView,
140            MZ_INTERNAL_SCHEMA,
141            "mz_cluster_workload_classes",
142        ),
143        MigrationStep::replacement(
144            "26.19.0-dev.0",
145            CatalogItemType::MaterializedView,
146            MZ_INTERNAL_SCHEMA,
147            "mz_internal_cluster_replicas",
148        ),
149        MigrationStep::replacement(
150            "26.19.0-dev.0",
151            CatalogItemType::MaterializedView,
152            MZ_INTERNAL_SCHEMA,
153            "mz_pending_cluster_replicas",
154        ),
155        MigrationStep::replacement(
156            "26.20.0-dev.0",
157            CatalogItemType::MaterializedView,
158            MZ_CATALOG_SCHEMA,
159            "mz_materialized_views",
160        ),
161        MigrationStep::replacement(
162            "26.22.0-dev.0",
163            CatalogItemType::MaterializedView,
164            MZ_CATALOG_SCHEMA,
165            "mz_connections",
166        ),
167        MigrationStep::replacement(
168            "26.22.0-dev.0",
169            CatalogItemType::MaterializedView,
170            MZ_CATALOG_SCHEMA,
171            "mz_secrets",
172        ),
173    ]
174});
175
176/// A migration required to upgrade past a specific version.
177#[derive(Clone, Debug)]
178struct MigrationStep {
179    /// The build version that requires this migration.
180    version: Version,
181    /// The object that requires migration.
182    object: SystemObjectDescription,
183    /// The migration mechanism to be used.
184    mechanism: Mechanism,
185}
186
187impl MigrationStep {
188    /// Helper to construct an `Evolution` migration step.
189    fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
190        Self {
191            version: Version::parse(version).expect("valid"),
192            object: SystemObjectDescription {
193                schema_name: schema.into(),
194                object_type: type_,
195                object_name: name.into(),
196            },
197            mechanism: Mechanism::Evolution,
198        }
199    }
200
201    /// Helper to construct a `Replacement` migration step.
202    fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
203        Self {
204            version: Version::parse(version).expect("valid"),
205            object: SystemObjectDescription {
206                schema_name: schema.into(),
207                object_type: type_,
208                object_name: name.into(),
209            },
210            mechanism: Mechanism::Replacement,
211        }
212    }
213}
214
215/// The mechanism to use to migrate the schema of a builtin collection.
216#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
217#[allow(dead_code)]
218enum Mechanism {
219    /// Persist schema evolution.
220    ///
221    /// Keeps existing contents but only works for schema changes that are backward compatible
222    /// according to [`backward_compatible`].
223    Evolution,
224    /// Shard replacement.
225    ///
226    /// Works for arbitrary schema changes but loses existing contents.
227    Replacement,
228}
229
230/// The result of a builtin schema migration.
231pub(super) struct MigrationResult {
232    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
233    pub replaced_items: BTreeSet<CatalogItemId>,
234    /// A cleanup action to take once the migration has been made durable.
235    pub cleanup_action: BoxFuture<'static, ()>,
236}
237
238impl Default for MigrationResult {
239    fn default() -> Self {
240        Self {
241            replaced_items: Default::default(),
242            cleanup_action: async {}.boxed(),
243        }
244    }
245}
246
247/// Run builtin schema migrations.
248///
249/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
250/// `BUILTINS` and `MIGRATIONS` lists to initialize the lists of available builtins and required
251/// migrations, respectively.
252pub(super) async fn run(
253    build_info: &BuildInfo,
254    deploy_generation: u64,
255    txn: &mut Transaction<'_>,
256    config: BuiltinItemMigrationConfig,
257) -> Result<MigrationResult, Error> {
258    // Sanity check to ensure we're not touching durable state in read-only mode.
259    assert_eq!(config.read_only, txn.is_savepoint());
260
261    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
262    // migrations if we observe this build info.
263    if *build_info == DUMMY_BUILD_INFO {
264        return Ok(MigrationResult::default());
265    }
266
267    let Some(durable_version) = get_migration_version(txn) else {
268        // New catalog; nothing to do.
269        return Ok(MigrationResult::default());
270    };
271    let build_version = build_info.semver_version();
272
273    let collection_metadata = txn.get_collection_metadata();
274    let system_objects = txn
275        .get_system_object_mappings()
276        .map(|m| {
277            let object = m.description;
278            let global_id = m.unique_identifier.global_id;
279            let shard_id = collection_metadata.get(&global_id).copied();
280            let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
281                panic!("missing builtin {object:?}");
282            };
283            let info = ObjectInfo {
284                global_id,
285                shard_id,
286                builtin,
287                fingerprint: m.unique_identifier.fingerprint,
288            };
289            (object, info)
290        })
291        .collect();
292
293    let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
294
295    let migration = Migration {
296        source_version: durable_version.clone(),
297        target_version: build_version.clone(),
298        deploy_generation,
299        system_objects,
300        migration_shard,
301        config,
302    };
303
304    let result = migration.run(&MIGRATIONS).await.map_err(|e| {
305        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
306            last_seen_version: durable_version.to_string(),
307            this_version: build_version.to_string(),
308            cause: e.to_string(),
309        })
310    })?;
311
312    result.apply(txn);
313
314    let replaced_items = txn
315        .get_system_object_mappings()
316        .map(|m| m.unique_identifier)
317        .filter(|ids| result.new_shards.contains_key(&ids.global_id))
318        .map(|ids| ids.catalog_id)
319        .collect();
320
321    Ok(MigrationResult {
322        replaced_items,
323        cleanup_action: result.cleanup_action,
324    })
325}
326
327/// Result produced by `Migration::run`.
328struct MigrationRunResult {
329    new_shards: BTreeMap<GlobalId, ShardId>,
330    new_fingerprints: BTreeMap<SystemObjectDescription, String>,
331    shards_to_finalize: BTreeSet<ShardId>,
332    cleanup_action: BoxFuture<'static, ()>,
333}
334
335impl Default for MigrationRunResult {
336    fn default() -> Self {
337        Self {
338            new_shards: BTreeMap::new(),
339            new_fingerprints: BTreeMap::new(),
340            shards_to_finalize: BTreeSet::new(),
341            cleanup_action: async {}.boxed(),
342        }
343    }
344}
345
346impl MigrationRunResult {
347    /// Apply this migration result to the given transaction.
348    fn apply(&self, txn: &mut Transaction<'_>) {
349        // Update collection metadata.
350        let replaced_ids = self.new_shards.keys().copied().collect();
351        let old_metadata = txn.delete_collection_metadata(replaced_ids);
352        txn.insert_collection_metadata(self.new_shards.clone())
353            .expect("inserting unique shards IDs after deleting existing entries");
354
355        // Register shards for finalization.
356        let mut unfinalized_shards: BTreeSet<_> =
357            old_metadata.into_iter().map(|(_, sid)| sid).collect();
358        unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
359        txn.insert_unfinalized_shards(unfinalized_shards)
360            .expect("cannot fail");
361
362        // Update fingerprints.
363        let mappings = txn
364            .get_system_object_mappings()
365            .filter_map(|m| {
366                let fingerprint = self.new_fingerprints.get(&m.description)?;
367                Some(SystemObjectMapping {
368                    description: m.description,
369                    unique_identifier: SystemObjectUniqueIdentifier {
370                        catalog_id: m.unique_identifier.catalog_id,
371                        global_id: m.unique_identifier.global_id,
372                        fingerprint: fingerprint.clone(),
373                    },
374                })
375            })
376            .collect();
377        txn.set_system_object_mappings(mappings)
378            .expect("filtered existing mappings remain unique");
379    }
380}
381
382/// Information about a system object required to run a `Migration`.
383#[derive(Clone, Debug)]
384struct ObjectInfo {
385    global_id: GlobalId,
386    shard_id: Option<ShardId>,
387    builtin: &'static Builtin<NameReference>,
388    fingerprint: String,
389}
390
391/// Context of a builtin schema migration.
392struct Migration {
393    /// The version we are migrating from.
394    ///
395    /// Same as the build version of the most recent leader process that successfully performed
396    /// migrations.
397    source_version: Version,
398    /// The version we are migration to.
399    ///
400    /// Same as the build version of this process.
401    target_version: Version,
402    /// The deploy generation of this process.
403    deploy_generation: u64,
404    /// Information about all objects in the system.
405    system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
406    /// The ID of the migration shard.
407    migration_shard: ShardId,
408    /// Additional configuration.
409    config: BuiltinItemMigrationConfig,
410}
411
412impl Migration {
413    async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
414        info!(
415            deploy_generation = %self.deploy_generation,
416            "running builtin schema migration: {} -> {}",
417            self.source_version, self.target_version
418        );
419
420        self.validate_migration_steps(steps);
421
422        let (force, plan) = match self.config.force_migration.as_deref() {
423            None => (false, self.plan_migration(steps)),
424            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
425            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
426            Some(other) => panic!("unknown force migration mechanism: {other}"),
427        };
428
429        if self.source_version == self.target_version && !force {
430            info!("skipping migration: already at target version");
431            return Ok(MigrationRunResult::default());
432        } else if self.source_version > self.target_version {
433            bail!("downgrade not supported");
434        }
435
436        // In leader mode, upgrade the version of the migration shard to the target version.
437        // This fences out any readers at lower versions.
438        if !self.config.read_only {
439            self.upgrade_migration_shard_version().await;
440        }
441
442        info!("executing migration plan: {plan:?}");
443
444        self.migrate_evolve(&plan.evolve).await?;
445        let new_shards = self.migrate_replace(&plan.replace).await?;
446
447        let mut migrated_objects = BTreeSet::new();
448        migrated_objects.extend(plan.evolve);
449        migrated_objects.extend(plan.replace);
450
451        let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
452
453        let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
454
455        Ok(MigrationRunResult {
456            new_shards,
457            new_fingerprints,
458            shards_to_finalize,
459            cleanup_action,
460        })
461    }
462
463    /// Sanity check the given migration steps.
464    ///
465    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
466    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
467        for step in steps {
468            assert!(
469                step.version <= self.target_version,
470                "migration step version greater than target version: {} > {}",
471                step.version,
472                self.target_version,
473            );
474
475            let object = &step.object;
476
477            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
478            // cause the table to be truncated because the contents are not also stored in the durable
479            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
480            // on startup. The correctness of that pruning relies on there being no other retractions
481            // to `mz_storage_usage_by_shard`.
482            //
483            // TODO: Confirm the above reasoning, it might be outdated?
484            assert_ne!(
485                &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
486                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
487            );
488
489            // `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it
490            // wouldn't be very durable if we allowed it to be truncated.
491            assert_ne!(
492                &*MZ_CATALOG_RAW_DESCRIPTION, object,
493                "mz_catalog_raw cannot be migrated"
494            );
495
496            let Some(object_info) = self.system_objects.get(object) else {
497                panic!("migration step for non-existent builtin: {object:?}");
498            };
499
500            let builtin = object_info.builtin;
501            use Builtin::*;
502            assert!(
503                matches!(
504                    builtin,
505                    Table(..) | Source(..) | MaterializedView(..) | ContinualTask(..)
506                ),
507                "schema migration not supported for builtin: {builtin:?}",
508            );
509        }
510    }
511
512    /// Select for each object to migrate the appropriate migration mechanism.
513    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
514        // Ignore any steps at versions before `source_version`.
515        let steps = steps.iter().filter(|s| s.version > self.source_version);
516
517        // Select a mechanism for each object, according to the requested migrations:
518        //  * If any `Replacement` was requested, use `Replacement`.
519        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
520        let mut by_object = BTreeMap::new();
521        for step in steps {
522            if let Some(entry) = by_object.get_mut(&step.object) {
523                *entry = match (step.mechanism, *entry) {
524                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
525                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
526                        Mechanism::Replacement
527                    }
528                };
529            } else {
530                by_object.insert(step.object.clone(), step.mechanism);
531            }
532        }
533
534        let mut plan = Plan::default();
535        for (object, mechanism) in by_object {
536            match mechanism {
537                Mechanism::Evolution => plan.evolve.push(object),
538                Mechanism::Replacement => plan.replace.push(object),
539            }
540        }
541
542        plan
543    }
544
545    /// Plan a forced migration of all objects using the given mechanism.
546    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
547        let objects = self
548            .system_objects
549            .iter()
550            .filter(|(_, info)| {
551                use Builtin::*;
552                match info.builtin {
553                    Table(..) | MaterializedView(..) | ContinualTask(..) => true,
554                    Source(source) => **source != *MZ_CATALOG_RAW,
555                    Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
556                }
557            })
558            .map(|(object, _)| object.clone())
559            .collect();
560
561        let mut plan = Plan::default();
562        match mechanism {
563            Mechanism::Evolution => plan.evolve = objects,
564            Mechanism::Replacement => plan.replace = objects,
565        }
566
567        plan
568    }
569
570    /// Upgrade the migration shard to the target version.
571    async fn upgrade_migration_shard_version(&self) {
572        let persist = &self.config.persist_client;
573        let diagnostics = Diagnostics {
574            shard_name: "builtin_migration".to_string(),
575            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
576        };
577
578        persist
579            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
580                self.migration_shard,
581                diagnostics,
582            )
583            .await
584            .expect("valid usage");
585    }
586
587    /// Migrate the given objects using the `Evolution` mechanism.
588    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
589        for object in objects {
590            self.migrate_evolve_one(object).await?;
591        }
592        Ok(())
593    }
594
595    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
596        let persist = &self.config.persist_client;
597
598        let Some(object_info) = self.system_objects.get(object) else {
599            bail!("missing builtin {object:?}");
600        };
601        let id = object_info.global_id;
602
603        let Some(shard_id) = object_info.shard_id else {
604            // No shard is registered for this builtin. In leader mode, this is fine, we'll
605            // register the shard during bootstrap. In read-only mode, we might be racing with the
606            // leader to register the shard and it's unclear what sort of confusion can arise from
607            // that -- better to bail out in this case.
608            if self.config.read_only {
609                bail!("missing shard ID for builtin {object:?} ({id})");
610            } else {
611                return Ok(());
612            }
613        };
614
615        let target_desc = match object_info.builtin {
616            Builtin::Table(table) => &table.desc,
617            Builtin::Source(source) => &source.desc,
618            Builtin::MaterializedView(mv) => &mv.desc,
619            Builtin::ContinualTask(ct) => &ct.desc,
620            _ => bail!("not a storage collection: {object:?}"),
621        };
622
623        let diagnostics = Diagnostics {
624            shard_name: id.to_string(),
625            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
626        };
627        let source_schema = persist
628            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
629            .await
630            .expect("valid usage");
631
632        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
633
634        if self.config.read_only {
635            // In read-only mode, only check that the new schema is backward compatible.
636            // We'll register it when/if we restart in leader mode.
637            if let Some((_, source_desc, _)) = &source_schema {
638                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
639                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
640                if backward_compatible(&old, &new).is_none() {
641                    bail!(
642                        "incompatible schema evolution for {object:?}: \
643                         {source_desc:?} -> {target_desc:?}"
644                    );
645                }
646            }
647
648            return Ok(());
649        }
650
651        let (mut schema_id, mut source_desc) = match source_schema {
652            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
653            None => {
654                // If no schema was previously registered, simply try to register the new one. This
655                // might fail due to a concurrent registration, in which case we'll fall back to
656                // `compare_and_evolve_schema`.
657
658                debug!(%id, %shard_id, "no previous schema found; registering initial one");
659                let schema_id = persist
660                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
661                        shard_id,
662                        target_desc,
663                        &UnitSchema,
664                        diagnostics.clone(),
665                    )
666                    .await
667                    .expect("valid usage");
668                if schema_id.is_some() {
669                    return Ok(());
670                }
671
672                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
673                let (schema_id, source_desc, _) = persist
674                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
675                        shard_id,
676                        diagnostics.clone(),
677                    )
678                    .await
679                    .expect("valid usage")
680                    .expect("known to exist");
681
682                (schema_id, source_desc)
683            }
684        };
685
686        loop {
687            // Evolving the schema might fail if another process evolved the schema concurrently,
688            // in which case we need to retry. Most likely the other process evolved the schema to
689            // our own target schema and the second try is a no-op.
690
691            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
692            let result = persist
693                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
694                    shard_id,
695                    schema_id,
696                    target_desc,
697                    &UnitSchema,
698                    diagnostics.clone(),
699                )
700                .await
701                .expect("valid usage");
702
703            match result {
704                CaESchema::Ok(schema_id) => {
705                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
706                    break;
707                }
708                CaESchema::Incompatible => bail!(
709                    "incompatible schema evolution for {object:?}: \
710                     {source_desc:?} -> {target_desc:?}"
711                ),
712                CaESchema::ExpectedMismatch {
713                    schema_id: new_id,
714                    key,
715                    val: UnitSchema,
716                } => {
717                    schema_id = new_id;
718                    source_desc = key;
719                }
720            }
721        }
722
723        Ok(())
724    }
725
726    /// Migrate the given objects using the `Replacement` mechanism.
727    async fn migrate_replace(
728        &self,
729        objects: &[SystemObjectDescription],
730    ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
731        if objects.is_empty() {
732            return Ok(Default::default());
733        }
734
735        let diagnostics = Diagnostics {
736            shard_name: "builtin_migration".to_string(),
737            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
738        };
739        let (mut persist_write, mut persist_read) =
740            self.open_migration_shard(diagnostics.clone()).await;
741
742        let mut ids_to_replace = BTreeSet::new();
743        for object in objects {
744            if let Some(info) = self.system_objects.get(object) {
745                ids_to_replace.insert(info.global_id);
746            } else {
747                bail!("missing id for builtin {object:?}");
748            }
749        }
750
751        info!(?objects, ?ids_to_replace, "migrating by replacement");
752
753        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
754        // This can fail due to writes by concurrent processes, so we need to retry.
755        let replaced_shards = loop {
756            if let Some(shards) = self
757                .try_get_or_insert_replacement_shards(
758                    &ids_to_replace,
759                    &mut persist_write,
760                    &mut persist_read,
761                )
762                .await?
763            {
764                break shards;
765            }
766        };
767
768        Ok(replaced_shards)
769    }
770
771    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
772    /// `target_version` and `deploy_generation`.
773    ///
774    /// This method looks for existing entries in the migration shards and returns those if they
775    /// are present. Otherwise it generates new shard IDs and tries to insert them.
776    ///
777    /// The result of this call is `None` if no existing entries were found and inserting new ones
778    /// failed because of a concurrent write to the migration shard. In this case, the caller is
779    /// expected to retry.
780    async fn try_get_or_insert_replacement_shards(
781        &self,
782        ids_to_replace: &BTreeSet<GlobalId>,
783        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
784        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
785    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
786        let upper = persist_write.fetch_recent_upper().await;
787        let write_ts = *upper.as_option().expect("migration shard not sealed");
788
789        let mut ids_to_replace = ids_to_replace.clone();
790        let mut replaced_shards = BTreeMap::new();
791
792        // Another process might already have done a shard replacement at our version and
793        // generation, in which case we can directly reuse the replacement shards.
794        //
795        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
796        // do. The set of migrations to run depends on both the source and the target version, and
797        // the migration shard is not keyed by source version. The previous writer might have seen
798        // a different source version, if there was a concurrent migration by a leader process.
799        if let Some(read_ts) = write_ts.step_back() {
800            let pred = |key: &migration_shard::Key| {
801                key.build_version == self.target_version
802                    && key.deploy_generation == Some(self.deploy_generation)
803            };
804            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
805                for (key, shard_id) in entries {
806                    let id = GlobalId::System(key.global_id);
807                    if ids_to_replace.remove(&id) {
808                        replaced_shards.insert(id, shard_id);
809                    }
810                }
811
812                debug!(
813                    %read_ts, ?replaced_shards, ?ids_to_replace,
814                    "found existing entries in migration shard",
815                );
816            }
817
818            if ids_to_replace.is_empty() {
819                return Ok(Some(replaced_shards));
820            }
821        }
822
823        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
824        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
825        // and we need to re-check the migration shard contents.
826        let mut updates = Vec::new();
827        for id in ids_to_replace {
828            let shard_id = ShardId::new();
829            replaced_shards.insert(id, shard_id);
830
831            let GlobalId::System(global_id) = id else {
832                bail!("attempt to migrate a non-system collection: {id}");
833            };
834            let key = migration_shard::Key {
835                global_id,
836                build_version: self.target_version.clone(),
837                deploy_generation: Some(self.deploy_generation),
838            };
839            updates.push(((key, shard_id), write_ts, 1));
840        }
841
842        let upper = Antichain::from_elem(write_ts);
843        let new_upper = Antichain::from_elem(write_ts.step_forward());
844        debug!(%write_ts, "attempting insert into migration shard");
845        let result = persist_write
846            .compare_and_append(updates, upper, new_upper)
847            .await
848            .expect("valid usage");
849
850        match result {
851            Ok(()) => {
852                debug!(
853                    %write_ts, ?replaced_shards,
854                    "successfully inserted into migration shard"
855                );
856                Ok(Some(replaced_shards))
857            }
858            Err(_mismatch) => Ok(None),
859        }
860    }
861
862    /// Open writer and reader for the migration shard.
863    async fn open_migration_shard(
864        &self,
865        diagnostics: Diagnostics,
866    ) -> (
867        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
868        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
869    ) {
870        let persist = &self.config.persist_client;
871
872        persist
873            .open(
874                self.migration_shard,
875                Arc::new(migration_shard::KeySchema),
876                Arc::new(ShardIdSchema),
877                diagnostics,
878                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
879            )
880            .await
881            .expect("valid usage")
882    }
883
884    /// Open a [`SinceHandle`] for the migration shard.
885    async fn open_migration_shard_since(
886        &self,
887        diagnostics: Diagnostics,
888    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
889        self.config
890            .persist_client
891            .open_critical_since(
892                self.migration_shard,
893                // TODO: We may need to use a different critical reader
894                // id for this if we want to be able to introspect it via SQL.
895                PersistClient::CONTROLLER_CRITICAL_SINCE,
896                Opaque::encode(&i64::MIN),
897                diagnostics.clone(),
898            )
899            .await
900            .expect("valid usage")
901    }
902
903    /// Update the fingerprints for `migrated_items`.
904    ///
905    /// Returns the new fingerprints. Also asserts that the current fingerprints of all other
906    /// system items match their builtin definitions.
907    fn update_fingerprints(
908        &self,
909        migrated_items: &BTreeSet<SystemObjectDescription>,
910    ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
911        let mut new_fingerprints = BTreeMap::new();
912        for (object, object_info) in &self.system_objects {
913            let id = object_info.global_id;
914            let builtin = object_info.builtin;
915
916            let fingerprint = builtin.fingerprint();
917            if fingerprint == object_info.fingerprint {
918                continue; // fingerprint unchanged, nothing to do
919            }
920
921            // Fingerprint mismatch is expected for a migrated item.
922            let migrated = migrated_items.contains(object);
923            // Some builtin types have schemas but no durable state. No migration needed for those.
924            let ephemeral = matches!(
925                builtin,
926                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
927            );
928
929            if migrated || ephemeral {
930                new_fingerprints.insert(object.clone(), fingerprint);
931            } else if builtin.runtime_alterable() {
932                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
933                // sentinel value stored in the catalog.
934                assert_eq!(
935                    object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
936                    "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
937                );
938            } else {
939                panic!(
940                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
941                    fingerprint, object_info.fingerprint,
942                );
943            }
944        }
945
946        Ok(new_fingerprints)
947    }
948
949    /// Perform cleanup of migration state, i.e. the migration shard.
950    ///
951    /// Returns a list of shards to finalize, and a `Future` that must be run after the shard
952    /// finalization has been durably enqueued. The `Future` is used to remove entries from the
953    /// migration shard only after we know the respective shards will be finalized. Removing
954    /// entries immediately would risk leaking the shards.
955    ///
956    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
957    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
958    /// it is safe to assume that any state for versions below the `target_version` is not needed
959    /// anymore and can be cleaned up.
960    ///
961    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
962    /// shard should always be pretty small, so keeping migration state around for longer isn't a
963    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
964    /// to transient failures, instead of retrying.
965    async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
966        let noop_action = async {}.boxed();
967        let noop_result = (BTreeSet::new(), noop_action);
968
969        if self.config.read_only {
970            return Ok(noop_result);
971        }
972
973        let diagnostics = Diagnostics {
974            shard_name: "builtin_migration".to_string(),
975            handle_purpose: "builtin schema migration cleanup".into(),
976        };
977        let (mut persist_write, mut persist_read) =
978            self.open_migration_shard(diagnostics.clone()).await;
979        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
980
981        let upper = persist_write.fetch_recent_upper().await.clone();
982        let write_ts = *upper.as_option().expect("migration shard not sealed");
983        let Some(read_ts) = write_ts.step_back() else {
984            return Ok(noop_result);
985        };
986
987        // Collect old entries to remove.
988        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
989        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
990        else {
991            return Ok(noop_result);
992        };
993
994        debug!(
995            ?stale_entries,
996            "cleaning migration shard up to version {}", self.target_version,
997        );
998
999        let current_shards: BTreeMap<_, _> = self
1000            .system_objects
1001            .values()
1002            .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1003            .collect();
1004
1005        let mut shards_to_finalize = BTreeSet::new();
1006        let mut retractions = Vec::new();
1007        for (key, shard_id) in stale_entries {
1008            // The migration shard contains both shards created during aborted upgrades and shards
1009            // created during successful upgrades. The latter may still be in use, so we have to
1010            // check and only finalize those that aren't anymore.
1011            let gid = GlobalId::System(key.global_id);
1012            if current_shards.get(&gid) != Some(&shard_id) {
1013                shards_to_finalize.insert(shard_id);
1014            }
1015
1016            retractions.push(((key, shard_id), write_ts, -1));
1017        }
1018
1019        let cleanup_action = async move {
1020            if !retractions.is_empty() {
1021                let new_upper = Antichain::from_elem(write_ts.step_forward());
1022                let result = persist_write
1023                    .compare_and_append(retractions, upper, new_upper)
1024                    .await
1025                    .expect("valid usage");
1026                match result {
1027                    Ok(()) => debug!("cleaned up migration shard"),
1028                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1029                }
1030            }
1031        }
1032        .boxed();
1033
1034        // Downgrade the since, to enable some compaction.
1035        let o = persist_since.opaque().clone();
1036        let new_since = Antichain::from_elem(read_ts);
1037        let result = persist_since
1038            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1039            .await;
1040        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1041
1042        Ok((shards_to_finalize, cleanup_action))
1043    }
1044}
1045
1046/// Read the migration shard at the given timestamp, returning all entries that match the given
1047/// predicate.
1048///
1049/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
1050/// `read_ts`.
1051async fn read_migration_shard<P>(
1052    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1053    read_ts: Timestamp,
1054    predicate: P,
1055) -> Option<Vec<(migration_shard::Key, ShardId)>>
1056where
1057    P: for<'a> Fn(&migration_shard::Key) -> bool,
1058{
1059    let as_of = Antichain::from_elem(read_ts);
1060    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1061
1062    assert!(
1063        updates.iter().all(|(_, _, diff)| *diff == 1),
1064        "migration shard contains invalid diffs: {updates:?}",
1065    );
1066
1067    let entries: Vec<_> = updates
1068        .into_iter()
1069        .map(|(data, _, _)| data)
1070        .filter(move |(key, _)| predicate(key))
1071        .collect();
1072
1073    (!entries.is_empty()).then_some(entries)
1074}
1075
1076/// A plan to migrate between two versions.
1077#[derive(Debug, Default)]
1078struct Plan {
1079    /// Objects to migrate using the `Evolution` mechanism.
1080    evolve: Vec<SystemObjectDescription>,
1081    /// Objects to migrate using the `Replacement` mechanism.
1082    replace: Vec<SystemObjectDescription>,
1083}
1084
1085/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
1086mod migration_shard {
1087    use std::fmt;
1088    use std::str::FromStr;
1089
1090    use arrow::array::{StringArray, StringBuilder};
1091    use bytes::{BufMut, Bytes};
1092    use mz_persist_types::Codec;
1093    use mz_persist_types::codec_impls::{
1094        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1095    };
1096    use mz_persist_types::columnar::Schema;
1097    use mz_persist_types::stats::NoneStats;
1098    use semver::Version;
1099    use serde::{Deserialize, Serialize};
1100
1101    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1102    pub(super) struct Key {
1103        pub(super) global_id: u64,
1104        pub(super) build_version: Version,
1105        // Versions < 26.0 didn't include the deploy generation. As long as we still might
1106        // encounter migration shard entries that don't have it, we need to keep this an `Option`
1107        // and keep supporting both key formats.
1108        pub(super) deploy_generation: Option<u64>,
1109    }
1110
1111    impl fmt::Display for Key {
1112        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1113            if self.deploy_generation.is_some() {
1114                // current format
1115                let s = serde_json::to_string(self).expect("JSON serializable");
1116                f.write_str(&s)
1117            } else {
1118                // pre-26.0 format
1119                write!(f, "{}-{}", self.global_id, self.build_version)
1120            }
1121        }
1122    }
1123
1124    impl FromStr for Key {
1125        type Err = String;
1126
1127        fn from_str(s: &str) -> Result<Self, String> {
1128            // current format
1129            if let Ok(key) = serde_json::from_str(s) {
1130                return Ok(key);
1131            };
1132
1133            // pre-26.0 format
1134            let parts: Vec<_> = s.splitn(2, '-').collect();
1135            let &[global_id, build_version] = parts.as_slice() else {
1136                return Err(format!("invalid Key '{s}'"));
1137            };
1138            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1139            let build_version = build_version
1140                .parse::<Version>()
1141                .map_err(|e| e.to_string())?;
1142            Ok(Key {
1143                global_id,
1144                build_version,
1145                deploy_generation: None,
1146            })
1147        }
1148    }
1149
1150    impl Default for Key {
1151        fn default() -> Self {
1152            Self {
1153                global_id: Default::default(),
1154                build_version: Version::new(0, 0, 0),
1155                deploy_generation: Some(0),
1156            }
1157        }
1158    }
1159
1160    impl Codec for Key {
1161        type Schema = KeySchema;
1162        type Storage = ();
1163
1164        fn codec_name() -> String {
1165            "TableKey".into()
1166        }
1167
1168        fn encode<B: BufMut>(&self, buf: &mut B) {
1169            buf.put(self.to_string().as_bytes())
1170        }
1171
1172        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1173            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1174            s.parse()
1175        }
1176
1177        fn encode_schema(_schema: &KeySchema) -> Bytes {
1178            Bytes::new()
1179        }
1180
1181        fn decode_schema(buf: &Bytes) -> Self::Schema {
1182            assert_eq!(*buf, Bytes::new());
1183            KeySchema
1184        }
1185    }
1186
1187    impl SimpleColumnarData for Key {
1188        type ArrowBuilder = StringBuilder;
1189        type ArrowColumn = StringArray;
1190
1191        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1192            builder.values_slice().len()
1193        }
1194
1195        fn push(&self, builder: &mut Self::ArrowBuilder) {
1196            builder.append_value(&self.to_string());
1197        }
1198
1199        fn push_null(builder: &mut Self::ArrowBuilder) {
1200            builder.append_null();
1201        }
1202
1203        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1204            *self = column.value(idx).parse().expect("valid Key");
1205        }
1206    }
1207
1208    #[derive(Debug, PartialEq)]
1209    pub(super) struct KeySchema;
1210
1211    impl Schema<Key> for KeySchema {
1212        type ArrowColumn = StringArray;
1213        type Statistics = NoneStats;
1214        type Decoder = SimpleColumnarDecoder<Key>;
1215        type Encoder = SimpleColumnarEncoder<Key>;
1216
1217        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1218            Ok(SimpleColumnarEncoder::default())
1219        }
1220
1221        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1222            Ok(SimpleColumnarDecoder::new(col))
1223        }
1224    }
1225}
1226
1227#[cfg(test)]
1228#[path = "builtin_schema_migration_tests.rs"]
1229mod tests;