Skip to main content

mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40    MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD,
41    MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
42};
43use mz_catalog::config::BuiltinItemMigrationConfig;
44use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
45use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
46use mz_catalog::memory::error::{Error, ErrorKind};
47use mz_ore::soft_assert_or_log;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
49use mz_persist_client::critical::{Opaque, SinceHandle};
50use mz_persist_client::read::ReadHandle;
51use mz_persist_client::schema::CaESchema;
52use mz_persist_client::write::WriteHandle;
53use mz_persist_client::{Diagnostics, PersistClient};
54use mz_persist_types::ShardId;
55use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
56use mz_persist_types::schema::backward_compatible;
57use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
58use mz_repr::{CatalogItemId, GlobalId, Timestamp};
59use mz_sql::catalog::{CatalogItemType, NameReference};
60use mz_storage_client::controller::StorageTxn;
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use semver::Version;
64use timely::progress::Antichain;
65use tracing::{debug, info};
66
67use crate::catalog::migrate::get_migration_version;
68
69/// Builtin schema migrations required to upgrade to the current build version.
70///
71/// Migration steps for old versions must be retained around according to the upgrade policy. For
72/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
73/// can delete all migration steps with versions before `(N-1).0.0`.
74///
75/// Smallest supported version: 0.147.0
76static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
77    vec![
78        MigrationStep::replacement(
79            "0.149.0",
80            CatalogItemType::Source,
81            MZ_INTERNAL_SCHEMA,
82            "mz_sink_statistics_raw",
83        ),
84        MigrationStep::replacement(
85            "0.149.0",
86            CatalogItemType::Source,
87            MZ_INTERNAL_SCHEMA,
88            "mz_source_statistics_raw",
89        ),
90        MigrationStep::evolution(
91            "0.159.0",
92            CatalogItemType::Source,
93            MZ_INTERNAL_SCHEMA,
94            "mz_cluster_replica_metrics_history",
95        ),
96        MigrationStep::replacement(
97            "0.160.0",
98            CatalogItemType::Table,
99            MZ_CATALOG_SCHEMA,
100            "mz_roles",
101        ),
102        MigrationStep::replacement(
103            "0.160.0",
104            CatalogItemType::Table,
105            MZ_CATALOG_SCHEMA,
106            "mz_sinks",
107        ),
108        MigrationStep::replacement(
109            "26.18.0-dev.0",
110            CatalogItemType::MaterializedView,
111            MZ_CATALOG_SCHEMA,
112            "mz_databases",
113        ),
114        MigrationStep::replacement(
115            "26.19.0-dev.0",
116            CatalogItemType::MaterializedView,
117            MZ_CATALOG_SCHEMA,
118            "mz_schemas",
119        ),
120        MigrationStep::replacement(
121            "26.19.0-dev.0",
122            CatalogItemType::MaterializedView,
123            MZ_CATALOG_SCHEMA,
124            "mz_role_members",
125        ),
126        MigrationStep::replacement(
127            "26.19.0-dev.0",
128            CatalogItemType::MaterializedView,
129            MZ_INTERNAL_SCHEMA,
130            "mz_network_policies",
131        ),
132        MigrationStep::replacement(
133            "26.19.0-dev.0",
134            CatalogItemType::MaterializedView,
135            MZ_INTERNAL_SCHEMA,
136            "mz_network_policy_rules",
137        ),
138        MigrationStep::replacement(
139            "26.19.0-dev.0",
140            CatalogItemType::MaterializedView,
141            MZ_INTERNAL_SCHEMA,
142            "mz_cluster_workload_classes",
143        ),
144        MigrationStep::replacement(
145            "26.19.0-dev.0",
146            CatalogItemType::MaterializedView,
147            MZ_INTERNAL_SCHEMA,
148            "mz_internal_cluster_replicas",
149        ),
150        MigrationStep::replacement(
151            "26.19.0-dev.0",
152            CatalogItemType::MaterializedView,
153            MZ_INTERNAL_SCHEMA,
154            "mz_pending_cluster_replicas",
155        ),
156        MigrationStep::replacement(
157            "26.20.0-dev.0",
158            CatalogItemType::MaterializedView,
159            MZ_CATALOG_SCHEMA,
160            "mz_materialized_views",
161        ),
162        MigrationStep::replacement(
163            "26.22.0-dev.0",
164            CatalogItemType::MaterializedView,
165            MZ_CATALOG_SCHEMA,
166            "mz_connections",
167        ),
168        MigrationStep::replacement(
169            "26.22.0-dev.0",
170            CatalogItemType::MaterializedView,
171            MZ_CATALOG_SCHEMA,
172            "mz_secrets",
173        ),
174    ]
175});
176
177/// A migration required to upgrade past a specific version.
178#[derive(Clone, Debug)]
179struct MigrationStep {
180    /// The build version that requires this migration.
181    version: Version,
182    /// The object that requires migration.
183    object: SystemObjectDescription,
184    /// The migration mechanism to be used.
185    mechanism: Mechanism,
186}
187
188impl MigrationStep {
189    /// Helper to construct an `Evolution` migration step.
190    fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
191        Self {
192            version: Version::parse(version).expect("valid"),
193            object: SystemObjectDescription {
194                schema_name: schema.into(),
195                object_type: type_,
196                object_name: name.into(),
197            },
198            mechanism: Mechanism::Evolution,
199        }
200    }
201
202    /// Helper to construct a `Replacement` migration step.
203    fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
204        Self {
205            version: Version::parse(version).expect("valid"),
206            object: SystemObjectDescription {
207                schema_name: schema.into(),
208                object_type: type_,
209                object_name: name.into(),
210            },
211            mechanism: Mechanism::Replacement,
212        }
213    }
214}
215
216/// The mechanism to use to migrate the schema of a builtin collection.
217#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
218#[allow(dead_code)]
219enum Mechanism {
220    /// Persist schema evolution.
221    ///
222    /// Keeps existing contents but only works for schema changes that are backward compatible
223    /// according to [`backward_compatible`].
224    Evolution,
225    /// Shard replacement.
226    ///
227    /// Works for arbitrary schema changes but loses existing contents.
228    Replacement,
229}
230
231/// The result of a builtin schema migration.
232pub(super) struct MigrationResult {
233    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
234    pub replaced_items: BTreeSet<CatalogItemId>,
235    /// A cleanup action to take once the migration has been made durable.
236    pub cleanup_action: BoxFuture<'static, ()>,
237}
238
239impl Default for MigrationResult {
240    fn default() -> Self {
241        Self {
242            replaced_items: Default::default(),
243            cleanup_action: async {}.boxed(),
244        }
245    }
246}
247
248/// Run builtin schema migrations.
249///
250/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
251/// `BUILTINS` and `MIGRATIONS` lists to initialize the lists of available builtins and required
252/// migrations, respectively.
253pub(super) async fn run(
254    build_info: &BuildInfo,
255    deploy_generation: u64,
256    txn: &mut Transaction<'_>,
257    config: BuiltinItemMigrationConfig,
258) -> Result<MigrationResult, Error> {
259    // Sanity check to ensure we're not touching durable state in read-only mode.
260    assert_eq!(config.read_only, txn.is_savepoint());
261
262    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
263    // migrations if we observe this build info.
264    if *build_info == DUMMY_BUILD_INFO {
265        return Ok(MigrationResult::default());
266    }
267
268    let Some(durable_version) = get_migration_version(txn) else {
269        // New catalog; nothing to do.
270        return Ok(MigrationResult::default());
271    };
272    let build_version = build_info.semver_version();
273
274    let collection_metadata = txn.get_collection_metadata();
275    let system_objects = txn
276        .get_system_object_mappings()
277        .map(|m| {
278            let object = m.description;
279            let global_id = m.unique_identifier.global_id;
280            let shard_id = collection_metadata.get(&global_id).copied();
281            let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
282                panic!("missing builtin {object:?}");
283            };
284            let info = ObjectInfo {
285                global_id,
286                shard_id,
287                builtin,
288                fingerprint: m.unique_identifier.fingerprint,
289            };
290            (object, info)
291        })
292        .collect();
293
294    let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
295
296    let migration = Migration {
297        source_version: durable_version.clone(),
298        target_version: build_version.clone(),
299        deploy_generation,
300        system_objects,
301        migration_shard,
302        config,
303    };
304
305    let result = migration.run(&MIGRATIONS).await.map_err(|e| {
306        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
307            last_seen_version: durable_version.to_string(),
308            this_version: build_version.to_string(),
309            cause: e.to_string(),
310        })
311    })?;
312
313    result.apply(txn);
314
315    let replaced_items = txn
316        .get_system_object_mappings()
317        .map(|m| m.unique_identifier)
318        .filter(|ids| result.new_shards.contains_key(&ids.global_id))
319        .map(|ids| ids.catalog_id)
320        .collect();
321
322    Ok(MigrationResult {
323        replaced_items,
324        cleanup_action: result.cleanup_action,
325    })
326}
327
328/// Result produced by `Migration::run`.
329struct MigrationRunResult {
330    new_shards: BTreeMap<GlobalId, ShardId>,
331    new_fingerprints: BTreeMap<SystemObjectDescription, String>,
332    shards_to_finalize: BTreeSet<ShardId>,
333    cleanup_action: BoxFuture<'static, ()>,
334}
335
336impl Default for MigrationRunResult {
337    fn default() -> Self {
338        Self {
339            new_shards: BTreeMap::new(),
340            new_fingerprints: BTreeMap::new(),
341            shards_to_finalize: BTreeSet::new(),
342            cleanup_action: async {}.boxed(),
343        }
344    }
345}
346
347impl MigrationRunResult {
348    /// Apply this migration result to the given transaction.
349    fn apply(&self, txn: &mut Transaction<'_>) {
350        // Update collection metadata.
351        let replaced_ids = self.new_shards.keys().copied().collect();
352        let old_metadata = txn.delete_collection_metadata(replaced_ids);
353        txn.insert_collection_metadata(self.new_shards.clone())
354            .expect("inserting unique shards IDs after deleting existing entries");
355
356        // Register shards for finalization.
357        let mut unfinalized_shards: BTreeSet<_> =
358            old_metadata.into_iter().map(|(_, sid)| sid).collect();
359        unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
360        txn.insert_unfinalized_shards(unfinalized_shards)
361            .expect("cannot fail");
362
363        // Update fingerprints.
364        let mappings = txn
365            .get_system_object_mappings()
366            .filter_map(|m| {
367                let fingerprint = self.new_fingerprints.get(&m.description)?;
368                Some(SystemObjectMapping {
369                    description: m.description,
370                    unique_identifier: SystemObjectUniqueIdentifier {
371                        catalog_id: m.unique_identifier.catalog_id,
372                        global_id: m.unique_identifier.global_id,
373                        fingerprint: fingerprint.clone(),
374                    },
375                })
376            })
377            .collect();
378        txn.set_system_object_mappings(mappings)
379            .expect("filtered existing mappings remain unique");
380    }
381}
382
383/// Information about a system object required to run a `Migration`.
384#[derive(Clone, Debug)]
385struct ObjectInfo {
386    global_id: GlobalId,
387    shard_id: Option<ShardId>,
388    builtin: &'static Builtin<NameReference>,
389    fingerprint: String,
390}
391
392/// Context of a builtin schema migration.
393struct Migration {
394    /// The version we are migrating from.
395    ///
396    /// Same as the build version of the most recent leader process that successfully performed
397    /// migrations.
398    source_version: Version,
399    /// The version we are migration to.
400    ///
401    /// Same as the build version of this process.
402    target_version: Version,
403    /// The deploy generation of this process.
404    deploy_generation: u64,
405    /// Information about all objects in the system.
406    system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
407    /// The ID of the migration shard.
408    migration_shard: ShardId,
409    /// Additional configuration.
410    config: BuiltinItemMigrationConfig,
411}
412
413impl Migration {
414    async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
415        info!(
416            deploy_generation = %self.deploy_generation,
417            "running builtin schema migration: {} -> {}",
418            self.source_version, self.target_version
419        );
420
421        self.validate_migration_steps(steps);
422
423        // Version-based migration filter fails for dev versions, see for example
424        // https://github.com/MaterializeInc/database-issues/issues/11335
425        let force_migration = if self.source_version != self.target_version
426            && self.source_version.pre.as_str().starts_with("dev")
427            && self.config.force_migration.is_none()
428        {
429            Some("evolution".to_string())
430        } else {
431            self.config.force_migration.clone()
432        };
433
434        let (force, plan) = match force_migration.as_deref() {
435            None => (false, self.plan_migration(steps)),
436            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
437            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
438            Some(other) => panic!("unknown force migration mechanism: {other}"),
439        };
440
441        if self.source_version == self.target_version && !force {
442            info!("skipping migration: already at target version");
443            return Ok(MigrationRunResult::default());
444        } else if self.source_version > self.target_version {
445            bail!("downgrade not supported");
446        }
447
448        // In leader mode, upgrade the version of the migration shard to the target version.
449        // This fences out any readers at lower versions.
450        if !self.config.read_only {
451            self.upgrade_migration_shard_version().await;
452        }
453
454        info!("executing migration plan: {plan:?}");
455
456        self.migrate_evolve(&plan.evolve).await?;
457        let new_shards = self.migrate_replace(&plan.replace).await?;
458
459        let mut migrated_objects = BTreeSet::new();
460        migrated_objects.extend(plan.evolve);
461        migrated_objects.extend(plan.replace);
462
463        let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
464
465        let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
466
467        Ok(MigrationRunResult {
468            new_shards,
469            new_fingerprints,
470            shards_to_finalize,
471            cleanup_action,
472        })
473    }
474
475    /// Sanity check the given migration steps.
476    ///
477    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
478    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
479        for step in steps {
480            assert!(
481                step.version <= self.target_version,
482                "migration step version greater than target version: {} > {}",
483                step.version,
484                self.target_version,
485            );
486
487            let object = &step.object;
488
489            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
490            // cause the table to be truncated because the contents are not also stored in the durable
491            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
492            // on startup. The correctness of that pruning relies on there being no other retractions
493            // to `mz_storage_usage_by_shard`.
494            //
495            // TODO: Confirm the above reasoning, it might be outdated?
496            assert_ne!(
497                &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
498                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
499            );
500
501            // Same hazard as `mz_storage_usage_by_shard`: the startup pruner
502            // (`Coordinator::prune_arrangement_sizes_history_on_startup`) assumes it is
503            // the only source of retractions, so migration-driven truncation would break it.
504            assert_ne!(
505                &*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object,
506                "mz_object_arrangement_size_history cannot be migrated or else the table will be truncated"
507            );
508
509            // `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it
510            // wouldn't be very durable if we allowed it to be truncated.
511            assert_ne!(
512                &*MZ_CATALOG_RAW_DESCRIPTION, object,
513                "mz_catalog_raw cannot be migrated"
514            );
515
516            let Some(object_info) = self.system_objects.get(object) else {
517                panic!("migration step for non-existent builtin: {object:?}");
518            };
519
520            let builtin = object_info.builtin;
521            use Builtin::*;
522            assert!(
523                matches!(builtin, Table(..) | Source(..) | MaterializedView(..)),
524                "schema migration not supported for builtin: {builtin:?}",
525            );
526        }
527    }
528
529    /// Select for each object to migrate the appropriate migration mechanism.
530    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
531        // Ignore any steps at versions before `source_version`.
532        let steps = steps.iter().filter(|s| s.version > self.source_version);
533
534        // Select a mechanism for each object, according to the requested migrations:
535        //  * If any `Replacement` was requested, use `Replacement`.
536        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
537        let mut by_object = BTreeMap::new();
538        for step in steps {
539            if let Some(entry) = by_object.get_mut(&step.object) {
540                *entry = match (step.mechanism, *entry) {
541                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
542                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
543                        Mechanism::Replacement
544                    }
545                };
546            } else {
547                by_object.insert(step.object.clone(), step.mechanism);
548            }
549        }
550
551        let mut plan = Plan::default();
552        for (object, mechanism) in by_object {
553            match mechanism {
554                Mechanism::Evolution => plan.evolve.push(object),
555                Mechanism::Replacement => plan.replace.push(object),
556            }
557        }
558
559        plan
560    }
561
562    /// Plan a forced migration of all objects using the given mechanism.
563    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
564        let objects = self
565            .system_objects
566            .iter()
567            .filter(|(_, info)| {
568                use Builtin::*;
569                match info.builtin {
570                    // Filter out the 'mz_storage_usage_by_shard' table since we need to retain
571                    // that info for billing purposes.
572                    Table(table) => **table != *MZ_STORAGE_USAGE_BY_SHARD,
573                    MaterializedView(..) => true,
574                    Source(source) => **source != *MZ_CATALOG_RAW,
575                    Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
576                }
577            })
578            .map(|(object, _)| object.clone())
579            .collect();
580
581        let mut plan = Plan::default();
582        match mechanism {
583            Mechanism::Evolution => plan.evolve = objects,
584            Mechanism::Replacement => plan.replace = objects,
585        }
586
587        plan
588    }
589
590    /// Upgrade the migration shard to the target version.
591    async fn upgrade_migration_shard_version(&self) {
592        let persist = &self.config.persist_client;
593        let diagnostics = Diagnostics {
594            shard_name: "builtin_migration".to_string(),
595            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
596        };
597
598        persist
599            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
600                self.migration_shard,
601                diagnostics,
602            )
603            .await
604            .expect("valid usage");
605    }
606
607    /// Migrate the given objects using the `Evolution` mechanism.
608    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
609        for object in objects {
610            self.migrate_evolve_one(object).await?;
611        }
612        Ok(())
613    }
614
615    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
616        let persist = &self.config.persist_client;
617
618        let Some(object_info) = self.system_objects.get(object) else {
619            bail!("missing builtin {object:?}");
620        };
621        let id = object_info.global_id;
622
623        let Some(shard_id) = object_info.shard_id else {
624            // No shard is registered for this builtin. In leader mode, this is fine, we'll
625            // register the shard during bootstrap. In read-only mode, we might be racing with the
626            // leader to register the shard and it's unclear what sort of confusion can arise from
627            // that -- better to bail out in this case.
628            if self.config.read_only {
629                bail!("missing shard ID for builtin {object:?} ({id})");
630            } else {
631                return Ok(());
632            }
633        };
634
635        let target_desc = match object_info.builtin {
636            Builtin::Table(table) => &table.desc,
637            Builtin::Source(source) => &source.desc,
638            Builtin::MaterializedView(mv) => &mv.desc,
639            _ => bail!("not a storage collection: {object:?}"),
640        };
641
642        let diagnostics = Diagnostics {
643            shard_name: id.to_string(),
644            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
645        };
646        let source_schema = persist
647            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
648            .await
649            .expect("valid usage");
650
651        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
652
653        if self.config.read_only {
654            // In read-only mode, only check that the new schema is backward compatible.
655            // We'll register it when/if we restart in leader mode.
656            if let Some((_, source_desc, _)) = &source_schema {
657                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
658                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
659                if backward_compatible(&old, &new).is_none() {
660                    bail!(
661                        "incompatible schema evolution for {object:?}: \
662                         {source_desc:?} -> {target_desc:?}"
663                    );
664                }
665            }
666
667            return Ok(());
668        }
669
670        let (mut schema_id, mut source_desc) = match source_schema {
671            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
672            None => {
673                // If no schema was previously registered, simply try to register the new one. This
674                // might fail due to a concurrent registration, in which case we'll fall back to
675                // `compare_and_evolve_schema`.
676
677                debug!(%id, %shard_id, "no previous schema found; registering initial one");
678                let schema_id = persist
679                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
680                        shard_id,
681                        target_desc,
682                        &UnitSchema,
683                        diagnostics.clone(),
684                    )
685                    .await
686                    .expect("valid usage");
687                if schema_id.is_some() {
688                    return Ok(());
689                }
690
691                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
692                let (schema_id, source_desc, _) = persist
693                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
694                        shard_id,
695                        diagnostics.clone(),
696                    )
697                    .await
698                    .expect("valid usage")
699                    .expect("known to exist");
700
701                (schema_id, source_desc)
702            }
703        };
704
705        loop {
706            // Evolving the schema might fail if another process evolved the schema concurrently,
707            // in which case we need to retry. Most likely the other process evolved the schema to
708            // our own target schema and the second try is a no-op.
709
710            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
711            let result = persist
712                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
713                    shard_id,
714                    schema_id,
715                    target_desc,
716                    &UnitSchema,
717                    diagnostics.clone(),
718                )
719                .await
720                .expect("valid usage");
721
722            match result {
723                CaESchema::Ok(schema_id) => {
724                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
725                    break;
726                }
727                CaESchema::Incompatible => bail!(
728                    "incompatible schema evolution for {object:?}: \
729                     {source_desc:?} -> {target_desc:?}"
730                ),
731                CaESchema::ExpectedMismatch {
732                    schema_id: new_id,
733                    key,
734                    val: UnitSchema,
735                } => {
736                    schema_id = new_id;
737                    source_desc = key;
738                }
739            }
740        }
741
742        Ok(())
743    }
744
745    /// Migrate the given objects using the `Replacement` mechanism.
746    async fn migrate_replace(
747        &self,
748        objects: &[SystemObjectDescription],
749    ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
750        if objects.is_empty() {
751            return Ok(Default::default());
752        }
753
754        let diagnostics = Diagnostics {
755            shard_name: "builtin_migration".to_string(),
756            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
757        };
758        let (mut persist_write, mut persist_read) =
759            self.open_migration_shard(diagnostics.clone()).await;
760
761        let mut ids_to_replace = BTreeSet::new();
762        for object in objects {
763            if let Some(info) = self.system_objects.get(object) {
764                ids_to_replace.insert(info.global_id);
765            } else {
766                bail!("missing id for builtin {object:?}");
767            }
768        }
769
770        info!(?objects, ?ids_to_replace, "migrating by replacement");
771
772        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
773        // This can fail due to writes by concurrent processes, so we need to retry.
774        let replaced_shards = loop {
775            if let Some(shards) = self
776                .try_get_or_insert_replacement_shards(
777                    &ids_to_replace,
778                    &mut persist_write,
779                    &mut persist_read,
780                )
781                .await?
782            {
783                break shards;
784            }
785        };
786
787        Ok(replaced_shards)
788    }
789
790    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
791    /// `target_version` and `deploy_generation`.
792    ///
793    /// This method looks for existing entries in the migration shards and returns those if they
794    /// are present. Otherwise it generates new shard IDs and tries to insert them.
795    ///
796    /// The result of this call is `None` if no existing entries were found and inserting new ones
797    /// failed because of a concurrent write to the migration shard. In this case, the caller is
798    /// expected to retry.
799    async fn try_get_or_insert_replacement_shards(
800        &self,
801        ids_to_replace: &BTreeSet<GlobalId>,
802        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
803        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
804    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
805        let upper = persist_write.fetch_recent_upper().await;
806        let write_ts = *upper.as_option().expect("migration shard not sealed");
807
808        let mut ids_to_replace = ids_to_replace.clone();
809        let mut replaced_shards = BTreeMap::new();
810
811        // Another process might already have done a shard replacement at our version and
812        // generation, in which case we can directly reuse the replacement shards.
813        //
814        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
815        // do. The set of migrations to run depends on both the source and the target version, and
816        // the migration shard is not keyed by source version. The previous writer might have seen
817        // a different source version, if there was a concurrent migration by a leader process.
818        if let Some(read_ts) = write_ts.step_back() {
819            let pred = |key: &migration_shard::Key| {
820                key.build_version == self.target_version
821                    && key.deploy_generation == Some(self.deploy_generation)
822            };
823            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
824                for (key, shard_id) in entries {
825                    let id = GlobalId::System(key.global_id);
826                    if ids_to_replace.remove(&id) {
827                        replaced_shards.insert(id, shard_id);
828                    }
829                }
830
831                debug!(
832                    %read_ts, ?replaced_shards, ?ids_to_replace,
833                    "found existing entries in migration shard",
834                );
835            }
836
837            if ids_to_replace.is_empty() {
838                return Ok(Some(replaced_shards));
839            }
840        }
841
842        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
843        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
844        // and we need to re-check the migration shard contents.
845        let mut updates = Vec::new();
846        for id in ids_to_replace {
847            let shard_id = ShardId::new();
848            replaced_shards.insert(id, shard_id);
849
850            let GlobalId::System(global_id) = id else {
851                bail!("attempt to migrate a non-system collection: {id}");
852            };
853            let key = migration_shard::Key {
854                global_id,
855                build_version: self.target_version.clone(),
856                deploy_generation: Some(self.deploy_generation),
857            };
858            updates.push(((key, shard_id), write_ts, 1));
859        }
860
861        let upper = Antichain::from_elem(write_ts);
862        let new_upper = Antichain::from_elem(write_ts.step_forward());
863        debug!(%write_ts, "attempting insert into migration shard");
864        let result = persist_write
865            .compare_and_append(updates, upper, new_upper)
866            .await
867            .expect("valid usage");
868
869        match result {
870            Ok(()) => {
871                debug!(
872                    %write_ts, ?replaced_shards,
873                    "successfully inserted into migration shard"
874                );
875                Ok(Some(replaced_shards))
876            }
877            Err(_mismatch) => Ok(None),
878        }
879    }
880
881    /// Open writer and reader for the migration shard.
882    async fn open_migration_shard(
883        &self,
884        diagnostics: Diagnostics,
885    ) -> (
886        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
887        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
888    ) {
889        let persist = &self.config.persist_client;
890
891        persist
892            .open(
893                self.migration_shard,
894                Arc::new(migration_shard::KeySchema),
895                Arc::new(ShardIdSchema),
896                diagnostics,
897                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
898            )
899            .await
900            .expect("valid usage")
901    }
902
903    /// Open a [`SinceHandle`] for the migration shard.
904    async fn open_migration_shard_since(
905        &self,
906        diagnostics: Diagnostics,
907    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
908        self.config
909            .persist_client
910            .open_critical_since(
911                self.migration_shard,
912                // TODO: We may need to use a different critical reader
913                // id for this if we want to be able to introspect it via SQL.
914                PersistClient::CONTROLLER_CRITICAL_SINCE,
915                Opaque::encode(&i64::MIN),
916                diagnostics.clone(),
917            )
918            .await
919            .expect("valid usage")
920    }
921
922    /// Update the fingerprints for `migrated_items`.
923    ///
924    /// Returns the new fingerprints. Also asserts that the current fingerprints of all other
925    /// system items match their builtin definitions.
926    fn update_fingerprints(
927        &self,
928        migrated_items: &BTreeSet<SystemObjectDescription>,
929    ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
930        let mut new_fingerprints = BTreeMap::new();
931        for (object, object_info) in &self.system_objects {
932            let id = object_info.global_id;
933            let builtin = object_info.builtin;
934
935            let fingerprint = builtin.fingerprint();
936            if fingerprint == object_info.fingerprint {
937                continue; // fingerprint unchanged, nothing to do
938            }
939
940            // Fingerprint mismatch is expected for a migrated item.
941            let migrated = migrated_items.contains(object);
942            // Some builtin types have schemas but no durable state. No migration needed for those.
943            let ephemeral = matches!(
944                builtin,
945                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
946            );
947
948            if migrated || ephemeral {
949                new_fingerprints.insert(object.clone(), fingerprint);
950            } else if builtin.runtime_alterable() {
951                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
952                // sentinel value stored in the catalog.
953                assert_eq!(
954                    object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
955                    "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
956                );
957            } else {
958                panic!(
959                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
960                    fingerprint, object_info.fingerprint,
961                );
962            }
963        }
964
965        Ok(new_fingerprints)
966    }
967
968    /// Perform cleanup of migration state, i.e. the migration shard.
969    ///
970    /// Returns a list of shards to finalize, and a `Future` that must be run after the shard
971    /// finalization has been durably enqueued. The `Future` is used to remove entries from the
972    /// migration shard only after we know the respective shards will be finalized. Removing
973    /// entries immediately would risk leaking the shards.
974    ///
975    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
976    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
977    /// it is safe to assume that any state for versions below the `target_version` is not needed
978    /// anymore and can be cleaned up.
979    ///
980    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
981    /// shard should always be pretty small, so keeping migration state around for longer isn't a
982    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
983    /// to transient failures, instead of retrying.
984    async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
985        let noop_action = async {}.boxed();
986        let noop_result = (BTreeSet::new(), noop_action);
987
988        if self.config.read_only {
989            return Ok(noop_result);
990        }
991
992        let diagnostics = Diagnostics {
993            shard_name: "builtin_migration".to_string(),
994            handle_purpose: "builtin schema migration cleanup".into(),
995        };
996        let (mut persist_write, mut persist_read) =
997            self.open_migration_shard(diagnostics.clone()).await;
998        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
999
1000        let upper = persist_write.fetch_recent_upper().await.clone();
1001        let write_ts = *upper.as_option().expect("migration shard not sealed");
1002        let Some(read_ts) = write_ts.step_back() else {
1003            return Ok(noop_result);
1004        };
1005
1006        // Collect old entries to remove.
1007        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
1008        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
1009        else {
1010            return Ok(noop_result);
1011        };
1012
1013        debug!(
1014            ?stale_entries,
1015            "cleaning migration shard up to version {}", self.target_version,
1016        );
1017
1018        let current_shards: BTreeMap<_, _> = self
1019            .system_objects
1020            .values()
1021            .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1022            .collect();
1023
1024        let mut shards_to_finalize = BTreeSet::new();
1025        let mut retractions = Vec::new();
1026        for (key, shard_id) in stale_entries {
1027            // The migration shard contains both shards created during aborted upgrades and shards
1028            // created during successful upgrades. The latter may still be in use, so we have to
1029            // check and only finalize those that aren't anymore.
1030            let gid = GlobalId::System(key.global_id);
1031            if current_shards.get(&gid) != Some(&shard_id) {
1032                shards_to_finalize.insert(shard_id);
1033            }
1034
1035            retractions.push(((key, shard_id), write_ts, -1));
1036        }
1037
1038        let cleanup_action = async move {
1039            if !retractions.is_empty() {
1040                let new_upper = Antichain::from_elem(write_ts.step_forward());
1041                let result = persist_write
1042                    .compare_and_append(retractions, upper, new_upper)
1043                    .await
1044                    .expect("valid usage");
1045                match result {
1046                    Ok(()) => debug!("cleaned up migration shard"),
1047                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1048                }
1049            }
1050        }
1051        .boxed();
1052
1053        // Downgrade the since, to enable some compaction.
1054        let o = persist_since.opaque().clone();
1055        let new_since = Antichain::from_elem(read_ts);
1056        let result = persist_since
1057            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1058            .await;
1059        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1060
1061        Ok((shards_to_finalize, cleanup_action))
1062    }
1063}
1064
1065/// Read the migration shard at the given timestamp, returning all entries that match the given
1066/// predicate.
1067///
1068/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
1069/// `read_ts`.
1070async fn read_migration_shard<P>(
1071    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1072    read_ts: Timestamp,
1073    predicate: P,
1074) -> Option<Vec<(migration_shard::Key, ShardId)>>
1075where
1076    P: for<'a> Fn(&migration_shard::Key) -> bool,
1077{
1078    let as_of = Antichain::from_elem(read_ts);
1079    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1080
1081    assert!(
1082        updates.iter().all(|(_, _, diff)| *diff == 1),
1083        "migration shard contains invalid diffs: {updates:?}",
1084    );
1085
1086    let entries: Vec<_> = updates
1087        .into_iter()
1088        .map(|(data, _, _)| data)
1089        .filter(move |(key, _)| predicate(key))
1090        .collect();
1091
1092    (!entries.is_empty()).then_some(entries)
1093}
1094
1095/// A plan to migrate between two versions.
1096#[derive(Debug, Default)]
1097struct Plan {
1098    /// Objects to migrate using the `Evolution` mechanism.
1099    evolve: Vec<SystemObjectDescription>,
1100    /// Objects to migrate using the `Replacement` mechanism.
1101    replace: Vec<SystemObjectDescription>,
1102}
1103
1104/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
1105mod migration_shard {
1106    use std::fmt;
1107    use std::str::FromStr;
1108
1109    use arrow::array::{StringArray, StringBuilder};
1110    use bytes::{BufMut, Bytes};
1111    use mz_persist_types::Codec;
1112    use mz_persist_types::codec_impls::{
1113        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1114    };
1115    use mz_persist_types::columnar::Schema;
1116    use mz_persist_types::stats::NoneStats;
1117    use semver::Version;
1118    use serde::{Deserialize, Serialize};
1119
1120    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1121    pub(super) struct Key {
1122        pub(super) global_id: u64,
1123        pub(super) build_version: Version,
1124        // Versions < 26.0 didn't include the deploy generation. As long as we still might
1125        // encounter migration shard entries that don't have it, we need to keep this an `Option`
1126        // and keep supporting both key formats.
1127        pub(super) deploy_generation: Option<u64>,
1128    }
1129
1130    impl fmt::Display for Key {
1131        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132            if self.deploy_generation.is_some() {
1133                // current format
1134                let s = serde_json::to_string(self).expect("JSON serializable");
1135                f.write_str(&s)
1136            } else {
1137                // pre-26.0 format
1138                write!(f, "{}-{}", self.global_id, self.build_version)
1139            }
1140        }
1141    }
1142
1143    impl FromStr for Key {
1144        type Err = String;
1145
1146        fn from_str(s: &str) -> Result<Self, String> {
1147            // current format
1148            if let Ok(key) = serde_json::from_str(s) {
1149                return Ok(key);
1150            };
1151
1152            // pre-26.0 format
1153            let parts: Vec<_> = s.splitn(2, '-').collect();
1154            let &[global_id, build_version] = parts.as_slice() else {
1155                return Err(format!("invalid Key '{s}'"));
1156            };
1157            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1158            let build_version = build_version
1159                .parse::<Version>()
1160                .map_err(|e| e.to_string())?;
1161            Ok(Key {
1162                global_id,
1163                build_version,
1164                deploy_generation: None,
1165            })
1166        }
1167    }
1168
1169    impl Default for Key {
1170        fn default() -> Self {
1171            Self {
1172                global_id: Default::default(),
1173                build_version: Version::new(0, 0, 0),
1174                deploy_generation: Some(0),
1175            }
1176        }
1177    }
1178
1179    impl Codec for Key {
1180        type Schema = KeySchema;
1181        type Storage = ();
1182
1183        fn codec_name() -> String {
1184            "TableKey".into()
1185        }
1186
1187        fn encode<B: BufMut>(&self, buf: &mut B) {
1188            buf.put(self.to_string().as_bytes())
1189        }
1190
1191        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1192            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1193            s.parse()
1194        }
1195
1196        fn encode_schema(_schema: &KeySchema) -> Bytes {
1197            Bytes::new()
1198        }
1199
1200        fn decode_schema(buf: &Bytes) -> Self::Schema {
1201            assert_eq!(*buf, Bytes::new());
1202            KeySchema
1203        }
1204    }
1205
1206    impl SimpleColumnarData for Key {
1207        type ArrowBuilder = StringBuilder;
1208        type ArrowColumn = StringArray;
1209
1210        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1211            builder.values_slice().len()
1212        }
1213
1214        fn push(&self, builder: &mut Self::ArrowBuilder) {
1215            builder.append_value(&self.to_string());
1216        }
1217
1218        fn push_null(builder: &mut Self::ArrowBuilder) {
1219            builder.append_null();
1220        }
1221
1222        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1223            *self = column.value(idx).parse().expect("valid Key");
1224        }
1225    }
1226
1227    #[derive(Debug, PartialEq)]
1228    pub(super) struct KeySchema;
1229
1230    impl Schema<Key> for KeySchema {
1231        type ArrowColumn = StringArray;
1232        type Statistics = NoneStats;
1233        type Decoder = SimpleColumnarDecoder<Key>;
1234        type Encoder = SimpleColumnarEncoder<Key>;
1235
1236        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1237            Ok(SimpleColumnarEncoder::default())
1238        }
1239
1240        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1241            Ok(SimpleColumnarDecoder::new(col))
1242        }
1243    }
1244}
1245
1246#[cfg(test)]
1247#[path = "builtin_schema_migration_tests.rs"]
1248mod tests;