Skip to main content

mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40    MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD,
41    MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
42};
43use mz_catalog::config::BuiltinItemMigrationConfig;
44use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
45use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
46use mz_catalog::memory::error::{Error, ErrorKind};
47use mz_ore::soft_assert_or_log;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
49use mz_persist_client::critical::{Opaque, SinceHandle};
50use mz_persist_client::read::ReadHandle;
51use mz_persist_client::schema::CaESchema;
52use mz_persist_client::write::WriteHandle;
53use mz_persist_client::{Diagnostics, PersistClient};
54use mz_persist_types::ShardId;
55use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
56use mz_persist_types::schema::backward_compatible;
57use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
58use mz_repr::{CatalogItemId, GlobalId, Timestamp};
59use mz_sql::catalog::{CatalogItemType, NameReference};
60use mz_storage_client::controller::StorageTxn;
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use semver::Version;
64use timely::progress::Antichain;
65use tracing::{debug, info};
66
67use crate::catalog::migrate::get_migration_version;
68
69/// Builtin schema migrations required to upgrade to the current build version.
70///
71/// Migration steps for old versions must be retained around according to the upgrade policy. For
72/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
73/// can delete all migration steps with versions before `(N-1).0.0`.
74///
75/// Smallest supported version: 0.147.0
76static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
77    vec![
78        MigrationStep::replacement(
79            "0.149.0",
80            CatalogItemType::Source,
81            MZ_INTERNAL_SCHEMA,
82            "mz_sink_statistics_raw",
83        ),
84        MigrationStep::replacement(
85            "0.149.0",
86            CatalogItemType::Source,
87            MZ_INTERNAL_SCHEMA,
88            "mz_source_statistics_raw",
89        ),
90        MigrationStep::evolution(
91            "0.159.0",
92            CatalogItemType::Source,
93            MZ_INTERNAL_SCHEMA,
94            "mz_cluster_replica_metrics_history",
95        ),
96        MigrationStep::replacement(
97            "0.160.0",
98            CatalogItemType::Table,
99            MZ_CATALOG_SCHEMA,
100            "mz_roles",
101        ),
102        MigrationStep::replacement(
103            "0.160.0",
104            CatalogItemType::Table,
105            MZ_CATALOG_SCHEMA,
106            "mz_sinks",
107        ),
108        MigrationStep::replacement(
109            "26.18.0-dev.0",
110            CatalogItemType::MaterializedView,
111            MZ_CATALOG_SCHEMA,
112            "mz_databases",
113        ),
114        MigrationStep::replacement(
115            "26.19.0-dev.0",
116            CatalogItemType::MaterializedView,
117            MZ_CATALOG_SCHEMA,
118            "mz_schemas",
119        ),
120        MigrationStep::replacement(
121            "26.19.0-dev.0",
122            CatalogItemType::MaterializedView,
123            MZ_CATALOG_SCHEMA,
124            "mz_role_members",
125        ),
126        MigrationStep::replacement(
127            "26.19.0-dev.0",
128            CatalogItemType::MaterializedView,
129            MZ_INTERNAL_SCHEMA,
130            "mz_network_policies",
131        ),
132        MigrationStep::replacement(
133            "26.19.0-dev.0",
134            CatalogItemType::MaterializedView,
135            MZ_INTERNAL_SCHEMA,
136            "mz_network_policy_rules",
137        ),
138        MigrationStep::replacement(
139            "26.19.0-dev.0",
140            CatalogItemType::MaterializedView,
141            MZ_INTERNAL_SCHEMA,
142            "mz_cluster_workload_classes",
143        ),
144        MigrationStep::replacement(
145            "26.19.0-dev.0",
146            CatalogItemType::MaterializedView,
147            MZ_INTERNAL_SCHEMA,
148            "mz_internal_cluster_replicas",
149        ),
150        MigrationStep::replacement(
151            "26.19.0-dev.0",
152            CatalogItemType::MaterializedView,
153            MZ_INTERNAL_SCHEMA,
154            "mz_pending_cluster_replicas",
155        ),
156        MigrationStep::replacement(
157            "26.20.0-dev.0",
158            CatalogItemType::MaterializedView,
159            MZ_CATALOG_SCHEMA,
160            "mz_materialized_views",
161        ),
162        MigrationStep::replacement(
163            "26.22.0-dev.0",
164            CatalogItemType::MaterializedView,
165            MZ_CATALOG_SCHEMA,
166            "mz_connections",
167        ),
168        MigrationStep::replacement(
169            "26.22.0-dev.0",
170            CatalogItemType::MaterializedView,
171            MZ_CATALOG_SCHEMA,
172            "mz_secrets",
173        ),
174        MigrationStep::replacement(
175            "26.27.0-dev.0",
176            CatalogItemType::MaterializedView,
177            MZ_CATALOG_SCHEMA,
178            "mz_sources",
179        ),
180    ]
181});
182
183/// A migration required to upgrade past a specific version.
184#[derive(Clone, Debug)]
185struct MigrationStep {
186    /// The build version that requires this migration.
187    version: Version,
188    /// The object that requires migration.
189    object: SystemObjectDescription,
190    /// The migration mechanism to be used.
191    mechanism: Mechanism,
192}
193
194impl MigrationStep {
195    /// Helper to construct an `Evolution` migration step.
196    fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
197        Self {
198            version: Version::parse(version).expect("valid"),
199            object: SystemObjectDescription {
200                schema_name: schema.into(),
201                object_type: type_,
202                object_name: name.into(),
203            },
204            mechanism: Mechanism::Evolution,
205        }
206    }
207
208    /// Helper to construct a `Replacement` migration step.
209    fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
210        Self {
211            version: Version::parse(version).expect("valid"),
212            object: SystemObjectDescription {
213                schema_name: schema.into(),
214                object_type: type_,
215                object_name: name.into(),
216            },
217            mechanism: Mechanism::Replacement,
218        }
219    }
220}
221
222/// The mechanism to use to migrate the schema of a builtin collection.
223#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
224#[allow(dead_code)]
225enum Mechanism {
226    /// Persist schema evolution.
227    ///
228    /// Keeps existing contents but only works for schema changes that are backward compatible
229    /// according to [`backward_compatible`].
230    Evolution,
231    /// Shard replacement.
232    ///
233    /// Works for arbitrary schema changes but loses existing contents.
234    Replacement,
235}
236
237/// The result of a builtin schema migration.
238pub(super) struct MigrationResult {
239    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
240    pub replaced_items: BTreeSet<CatalogItemId>,
241    /// A cleanup action to take once the migration has been made durable.
242    pub cleanup_action: BoxFuture<'static, ()>,
243}
244
245impl Default for MigrationResult {
246    fn default() -> Self {
247        Self {
248            replaced_items: Default::default(),
249            cleanup_action: async {}.boxed(),
250        }
251    }
252}
253
254/// Run builtin schema migrations.
255///
256/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
257/// `BUILTINS` and `MIGRATIONS` lists to initialize the lists of available builtins and required
258/// migrations, respectively.
259pub(super) async fn run(
260    build_info: &BuildInfo,
261    deploy_generation: u64,
262    txn: &mut Transaction<'_>,
263    config: BuiltinItemMigrationConfig,
264) -> Result<MigrationResult, Error> {
265    // Sanity check to ensure we're not touching durable state in read-only mode.
266    assert_eq!(config.read_only, txn.is_savepoint());
267
268    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
269    // migrations if we observe this build info.
270    if *build_info == DUMMY_BUILD_INFO {
271        return Ok(MigrationResult::default());
272    }
273
274    let Some(durable_version) = get_migration_version(txn) else {
275        // New catalog; nothing to do.
276        return Ok(MigrationResult::default());
277    };
278    let build_version = build_info.semver_version();
279
280    let collection_metadata = txn.get_collection_metadata();
281    let system_objects = txn
282        .get_system_object_mappings()
283        .map(|m| {
284            let object = m.description;
285            let global_id = m.unique_identifier.global_id;
286            let shard_id = collection_metadata.get(&global_id).copied();
287            let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
288                panic!("missing builtin {object:?}");
289            };
290            let info = ObjectInfo {
291                global_id,
292                shard_id,
293                builtin,
294                fingerprint: m.unique_identifier.fingerprint,
295            };
296            (object, info)
297        })
298        .collect();
299
300    let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
301
302    let migration = Migration {
303        source_version: durable_version.clone(),
304        target_version: build_version.clone(),
305        deploy_generation,
306        system_objects,
307        migration_shard,
308        config,
309    };
310
311    let result = migration.run(&MIGRATIONS).await.map_err(|e| {
312        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
313            last_seen_version: durable_version.to_string(),
314            this_version: build_version.to_string(),
315            cause: e.to_string(),
316        })
317    })?;
318
319    result.apply(txn);
320
321    let replaced_items = txn
322        .get_system_object_mappings()
323        .map(|m| m.unique_identifier)
324        .filter(|ids| result.new_shards.contains_key(&ids.global_id))
325        .map(|ids| ids.catalog_id)
326        .collect();
327
328    Ok(MigrationResult {
329        replaced_items,
330        cleanup_action: result.cleanup_action,
331    })
332}
333
334/// Result produced by `Migration::run`.
335struct MigrationRunResult {
336    new_shards: BTreeMap<GlobalId, ShardId>,
337    new_fingerprints: BTreeMap<SystemObjectDescription, String>,
338    shards_to_finalize: BTreeSet<ShardId>,
339    cleanup_action: BoxFuture<'static, ()>,
340}
341
342impl Default for MigrationRunResult {
343    fn default() -> Self {
344        Self {
345            new_shards: BTreeMap::new(),
346            new_fingerprints: BTreeMap::new(),
347            shards_to_finalize: BTreeSet::new(),
348            cleanup_action: async {}.boxed(),
349        }
350    }
351}
352
353impl MigrationRunResult {
354    /// Apply this migration result to the given transaction.
355    fn apply(&self, txn: &mut Transaction<'_>) {
356        // Update collection metadata.
357        let replaced_ids = self.new_shards.keys().copied().collect();
358        let old_metadata = txn.delete_collection_metadata(replaced_ids);
359        txn.insert_collection_metadata(self.new_shards.clone())
360            .expect("inserting unique shards IDs after deleting existing entries");
361
362        // Register shards for finalization.
363        let mut unfinalized_shards: BTreeSet<_> =
364            old_metadata.into_iter().map(|(_, sid)| sid).collect();
365        unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
366        txn.insert_unfinalized_shards(unfinalized_shards)
367            .expect("cannot fail");
368
369        // Update fingerprints.
370        let mappings = txn
371            .get_system_object_mappings()
372            .filter_map(|m| {
373                let fingerprint = self.new_fingerprints.get(&m.description)?;
374                Some(SystemObjectMapping {
375                    description: m.description,
376                    unique_identifier: SystemObjectUniqueIdentifier {
377                        catalog_id: m.unique_identifier.catalog_id,
378                        global_id: m.unique_identifier.global_id,
379                        fingerprint: fingerprint.clone(),
380                    },
381                })
382            })
383            .collect();
384        txn.set_system_object_mappings(mappings)
385            .expect("filtered existing mappings remain unique");
386    }
387}
388
389/// Information about a system object required to run a `Migration`.
390#[derive(Clone, Debug)]
391struct ObjectInfo {
392    global_id: GlobalId,
393    shard_id: Option<ShardId>,
394    builtin: &'static Builtin<NameReference>,
395    fingerprint: String,
396}
397
398/// Context of a builtin schema migration.
399struct Migration {
400    /// The version we are migrating from.
401    ///
402    /// Same as the build version of the most recent leader process that successfully performed
403    /// migrations.
404    source_version: Version,
405    /// The version we are migration to.
406    ///
407    /// Same as the build version of this process.
408    target_version: Version,
409    /// The deploy generation of this process.
410    deploy_generation: u64,
411    /// Information about all objects in the system.
412    system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
413    /// The ID of the migration shard.
414    migration_shard: ShardId,
415    /// Additional configuration.
416    config: BuiltinItemMigrationConfig,
417}
418
419impl Migration {
420    async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
421        info!(
422            deploy_generation = %self.deploy_generation,
423            "running builtin schema migration: {} -> {}",
424            self.source_version, self.target_version
425        );
426
427        self.validate_migration_steps(steps);
428
429        // Version-based migration filter fails for dev versions, see for example
430        // https://github.com/MaterializeInc/database-issues/issues/11335
431        let force_migration = if self.source_version != self.target_version
432            && self.source_version.pre.as_str().starts_with("dev")
433            && self.config.force_migration.is_none()
434        {
435            Some("evolution".to_string())
436        } else {
437            self.config.force_migration.clone()
438        };
439
440        let (force, plan) = match force_migration.as_deref() {
441            None => (false, self.plan_migration(steps)),
442            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
443            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
444            Some(other) => panic!("unknown force migration mechanism: {other}"),
445        };
446
447        if self.source_version == self.target_version && !force {
448            info!("skipping migration: already at target version");
449            return Ok(MigrationRunResult::default());
450        } else if self.source_version > self.target_version {
451            bail!("downgrade not supported");
452        }
453
454        // In leader mode, upgrade the version of the migration shard to the target version.
455        // This fences out any readers at lower versions.
456        if !self.config.read_only {
457            self.upgrade_migration_shard_version().await;
458        }
459
460        info!("executing migration plan: {plan:?}");
461
462        self.migrate_evolve(&plan.evolve).await?;
463        let new_shards = self.migrate_replace(&plan.replace).await?;
464
465        let mut migrated_objects = BTreeSet::new();
466        migrated_objects.extend(plan.evolve);
467        migrated_objects.extend(plan.replace);
468
469        let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
470
471        let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
472
473        Ok(MigrationRunResult {
474            new_shards,
475            new_fingerprints,
476            shards_to_finalize,
477            cleanup_action,
478        })
479    }
480
481    /// Sanity check the given migration steps.
482    ///
483    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
484    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
485        for step in steps {
486            assert!(
487                step.version <= self.target_version,
488                "migration step version greater than target version: {} > {}",
489                step.version,
490                self.target_version,
491            );
492
493            let object = &step.object;
494
495            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
496            // cause the table to be truncated because the contents are not also stored in the durable
497            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
498            // on startup. The correctness of that pruning relies on there being no other retractions
499            // to `mz_storage_usage_by_shard`.
500            //
501            // TODO: Confirm the above reasoning, it might be outdated?
502            assert_ne!(
503                &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
504                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
505            );
506
507            // Same hazard as `mz_storage_usage_by_shard`: the startup pruner
508            // (`Coordinator::prune_arrangement_sizes_history_on_startup`) assumes it is
509            // the only source of retractions, so migration-driven truncation would break it.
510            assert_ne!(
511                &*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object,
512                "mz_object_arrangement_size_history cannot be migrated or else the table will be truncated"
513            );
514
515            // `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it
516            // wouldn't be very durable if we allowed it to be truncated.
517            assert_ne!(
518                &*MZ_CATALOG_RAW_DESCRIPTION, object,
519                "mz_catalog_raw cannot be migrated"
520            );
521
522            let Some(object_info) = self.system_objects.get(object) else {
523                panic!("migration step for non-existent builtin: {object:?}");
524            };
525
526            let builtin = object_info.builtin;
527            use Builtin::*;
528            assert!(
529                matches!(builtin, Table(..) | Source(..) | MaterializedView(..)),
530                "schema migration not supported for builtin: {builtin:?}",
531            );
532        }
533    }
534
535    /// Select for each object to migrate the appropriate migration mechanism.
536    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
537        // Ignore any steps at versions before `source_version`.
538        let steps = steps.iter().filter(|s| s.version > self.source_version);
539
540        // Select a mechanism for each object, according to the requested migrations:
541        //  * If any `Replacement` was requested, use `Replacement`.
542        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
543        let mut by_object = BTreeMap::new();
544        for step in steps {
545            if let Some(entry) = by_object.get_mut(&step.object) {
546                *entry = match (step.mechanism, *entry) {
547                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
548                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
549                        Mechanism::Replacement
550                    }
551                };
552            } else {
553                by_object.insert(step.object.clone(), step.mechanism);
554            }
555        }
556
557        let mut plan = Plan::default();
558        for (object, mechanism) in by_object {
559            match mechanism {
560                Mechanism::Evolution => plan.evolve.push(object),
561                Mechanism::Replacement => plan.replace.push(object),
562            }
563        }
564
565        plan
566    }
567
568    /// Plan a forced migration of all objects using the given mechanism.
569    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
570        let objects = self
571            .system_objects
572            .iter()
573            // Skip objects that don't yet have a shard registered. These are brand-new builtins
574            // added in this version; the leader will allocate their shards during bootstrap, and
575            // there is nothing to evolve or replace.
576            .filter(|(_, info)| info.shard_id.is_some())
577            .filter(|(_, info)| {
578                use Builtin::*;
579                match info.builtin {
580                    // Filter out the 'mz_storage_usage_by_shard' table since we need to retain
581                    // that info for billing purposes.
582                    Table(table) => **table != *MZ_STORAGE_USAGE_BY_SHARD,
583                    MaterializedView(..) => true,
584                    Source(source) => **source != *MZ_CATALOG_RAW,
585                    Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
586                }
587            })
588            .map(|(object, _)| object.clone())
589            .collect();
590
591        let mut plan = Plan::default();
592        match mechanism {
593            Mechanism::Evolution => plan.evolve = objects,
594            Mechanism::Replacement => plan.replace = objects,
595        }
596
597        plan
598    }
599
600    /// Upgrade the migration shard to the target version.
601    async fn upgrade_migration_shard_version(&self) {
602        let persist = &self.config.persist_client;
603        let diagnostics = Diagnostics {
604            shard_name: "builtin_migration".to_string(),
605            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
606        };
607
608        persist
609            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
610                self.migration_shard,
611                diagnostics,
612            )
613            .await
614            .expect("valid usage");
615    }
616
617    /// Migrate the given objects using the `Evolution` mechanism.
618    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
619        for object in objects {
620            self.migrate_evolve_one(object).await?;
621        }
622        Ok(())
623    }
624
625    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
626        let persist = &self.config.persist_client;
627
628        let Some(object_info) = self.system_objects.get(object) else {
629            bail!("missing builtin {object:?}");
630        };
631        let id = object_info.global_id;
632
633        let Some(shard_id) = object_info.shard_id else {
634            // No shard is registered for this builtin. In leader mode, this is fine, we'll
635            // register the shard during bootstrap. In read-only mode, we might be racing with the
636            // leader to register the shard and it's unclear what sort of confusion can arise from
637            // that -- better to bail out in this case.
638            if self.config.read_only {
639                bail!("missing shard ID for builtin {object:?} ({id})");
640            } else {
641                return Ok(());
642            }
643        };
644
645        let target_desc = match object_info.builtin {
646            Builtin::Table(table) => &table.desc,
647            Builtin::Source(source) => &source.desc,
648            Builtin::MaterializedView(mv) => &mv.desc,
649            _ => bail!("not a storage collection: {object:?}"),
650        };
651
652        let diagnostics = Diagnostics {
653            shard_name: id.to_string(),
654            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
655        };
656        let source_schema = persist
657            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
658            .await
659            .expect("valid usage");
660
661        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
662
663        if self.config.read_only {
664            // In read-only mode, only check that the new schema is backward compatible.
665            // We'll register it when/if we restart in leader mode.
666            if let Some((_, source_desc, _)) = &source_schema {
667                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
668                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
669                if backward_compatible(&old, &new).is_none() {
670                    bail!(
671                        "incompatible schema evolution for {object:?}: \
672                         {source_desc:?} -> {target_desc:?}"
673                    );
674                }
675            }
676
677            return Ok(());
678        }
679
680        let (mut schema_id, mut source_desc) = match source_schema {
681            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
682            None => {
683                // If no schema was previously registered, simply try to register the new one. This
684                // might fail due to a concurrent registration, in which case we'll fall back to
685                // `compare_and_evolve_schema`.
686
687                debug!(%id, %shard_id, "no previous schema found; registering initial one");
688                let schema_id = persist
689                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
690                        shard_id,
691                        target_desc,
692                        &UnitSchema,
693                        diagnostics.clone(),
694                    )
695                    .await
696                    .expect("valid usage");
697                if schema_id.is_some() {
698                    return Ok(());
699                }
700
701                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
702                let (schema_id, source_desc, _) = persist
703                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
704                        shard_id,
705                        diagnostics.clone(),
706                    )
707                    .await
708                    .expect("valid usage")
709                    .expect("known to exist");
710
711                (schema_id, source_desc)
712            }
713        };
714
715        loop {
716            // Evolving the schema might fail if another process evolved the schema concurrently,
717            // in which case we need to retry. Most likely the other process evolved the schema to
718            // our own target schema and the second try is a no-op.
719
720            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
721            let result = persist
722                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
723                    shard_id,
724                    schema_id,
725                    target_desc,
726                    &UnitSchema,
727                    diagnostics.clone(),
728                )
729                .await
730                .expect("valid usage");
731
732            match result {
733                CaESchema::Ok(schema_id) => {
734                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
735                    break;
736                }
737                CaESchema::Incompatible => bail!(
738                    "incompatible schema evolution for {object:?}: \
739                     {source_desc:?} -> {target_desc:?}"
740                ),
741                CaESchema::ExpectedMismatch {
742                    schema_id: new_id,
743                    key,
744                    val: UnitSchema,
745                } => {
746                    schema_id = new_id;
747                    source_desc = key;
748                }
749            }
750        }
751
752        Ok(())
753    }
754
755    /// Migrate the given objects using the `Replacement` mechanism.
756    async fn migrate_replace(
757        &self,
758        objects: &[SystemObjectDescription],
759    ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
760        if objects.is_empty() {
761            return Ok(Default::default());
762        }
763
764        let diagnostics = Diagnostics {
765            shard_name: "builtin_migration".to_string(),
766            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
767        };
768        let (mut persist_write, mut persist_read) =
769            self.open_migration_shard(diagnostics.clone()).await;
770
771        let mut ids_to_replace = BTreeSet::new();
772        for object in objects {
773            if let Some(info) = self.system_objects.get(object) {
774                ids_to_replace.insert(info.global_id);
775            } else {
776                bail!("missing id for builtin {object:?}");
777            }
778        }
779
780        info!(?objects, ?ids_to_replace, "migrating by replacement");
781
782        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
783        // This can fail due to writes by concurrent processes, so we need to retry.
784        let replaced_shards = loop {
785            if let Some(shards) = self
786                .try_get_or_insert_replacement_shards(
787                    &ids_to_replace,
788                    &mut persist_write,
789                    &mut persist_read,
790                )
791                .await?
792            {
793                break shards;
794            }
795        };
796
797        Ok(replaced_shards)
798    }
799
800    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
801    /// `target_version` and `deploy_generation`.
802    ///
803    /// This method looks for existing entries in the migration shards and returns those if they
804    /// are present. Otherwise it generates new shard IDs and tries to insert them.
805    ///
806    /// The result of this call is `None` if no existing entries were found and inserting new ones
807    /// failed because of a concurrent write to the migration shard. In this case, the caller is
808    /// expected to retry.
809    async fn try_get_or_insert_replacement_shards(
810        &self,
811        ids_to_replace: &BTreeSet<GlobalId>,
812        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
813        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
814    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
815        let upper = persist_write.fetch_recent_upper().await;
816        let write_ts = *upper.as_option().expect("migration shard not sealed");
817
818        let mut ids_to_replace = ids_to_replace.clone();
819        let mut replaced_shards = BTreeMap::new();
820
821        // Another process might already have done a shard replacement at our version and
822        // generation, in which case we can directly reuse the replacement shards.
823        //
824        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
825        // do. The set of migrations to run depends on both the source and the target version, and
826        // the migration shard is not keyed by source version. The previous writer might have seen
827        // a different source version, if there was a concurrent migration by a leader process.
828        if let Some(read_ts) = write_ts.step_back() {
829            let pred = |key: &migration_shard::Key| {
830                key.build_version == self.target_version
831                    && key.deploy_generation == Some(self.deploy_generation)
832            };
833            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
834                for (key, shard_id) in entries {
835                    let id = GlobalId::System(key.global_id);
836                    if ids_to_replace.remove(&id) {
837                        replaced_shards.insert(id, shard_id);
838                    }
839                }
840
841                debug!(
842                    %read_ts, ?replaced_shards, ?ids_to_replace,
843                    "found existing entries in migration shard",
844                );
845            }
846
847            if ids_to_replace.is_empty() {
848                return Ok(Some(replaced_shards));
849            }
850        }
851
852        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
853        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
854        // and we need to re-check the migration shard contents.
855        let mut updates = Vec::new();
856        for id in ids_to_replace {
857            let shard_id = ShardId::new();
858            replaced_shards.insert(id, shard_id);
859
860            let GlobalId::System(global_id) = id else {
861                bail!("attempt to migrate a non-system collection: {id}");
862            };
863            let key = migration_shard::Key {
864                global_id,
865                build_version: self.target_version.clone(),
866                deploy_generation: Some(self.deploy_generation),
867            };
868            updates.push(((key, shard_id), write_ts, 1));
869        }
870
871        let upper = Antichain::from_elem(write_ts);
872        let new_upper = Antichain::from_elem(write_ts.step_forward());
873        debug!(%write_ts, "attempting insert into migration shard");
874        let result = persist_write
875            .compare_and_append(updates, upper, new_upper)
876            .await
877            .expect("valid usage");
878
879        match result {
880            Ok(()) => {
881                debug!(
882                    %write_ts, ?replaced_shards,
883                    "successfully inserted into migration shard"
884                );
885                Ok(Some(replaced_shards))
886            }
887            Err(_mismatch) => Ok(None),
888        }
889    }
890
891    /// Open writer and reader for the migration shard.
892    async fn open_migration_shard(
893        &self,
894        diagnostics: Diagnostics,
895    ) -> (
896        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
897        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
898    ) {
899        let persist = &self.config.persist_client;
900
901        persist
902            .open(
903                self.migration_shard,
904                Arc::new(migration_shard::KeySchema),
905                Arc::new(ShardIdSchema),
906                diagnostics,
907                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
908            )
909            .await
910            .expect("valid usage")
911    }
912
913    /// Open a [`SinceHandle`] for the migration shard.
914    async fn open_migration_shard_since(
915        &self,
916        diagnostics: Diagnostics,
917    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
918        self.config
919            .persist_client
920            .open_critical_since(
921                self.migration_shard,
922                // TODO: We may need to use a different critical reader
923                // id for this if we want to be able to introspect it via SQL.
924                PersistClient::CONTROLLER_CRITICAL_SINCE,
925                Opaque::encode(&i64::MIN),
926                diagnostics.clone(),
927            )
928            .await
929            .expect("valid usage")
930    }
931
932    /// Update the fingerprints for `migrated_items`.
933    ///
934    /// Returns the new fingerprints. Also asserts that the current fingerprints of all other
935    /// system items match their builtin definitions.
936    fn update_fingerprints(
937        &self,
938        migrated_items: &BTreeSet<SystemObjectDescription>,
939    ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
940        let mut new_fingerprints = BTreeMap::new();
941        for (object, object_info) in &self.system_objects {
942            let id = object_info.global_id;
943            let builtin = object_info.builtin;
944
945            let fingerprint = builtin.fingerprint();
946            if fingerprint == object_info.fingerprint {
947                continue; // fingerprint unchanged, nothing to do
948            }
949
950            // Fingerprint mismatch is expected for a migrated item.
951            let migrated = migrated_items.contains(object);
952            // Some builtin types have schemas but no durable state. No migration needed for those.
953            let ephemeral = matches!(
954                builtin,
955                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
956            );
957
958            if migrated || ephemeral {
959                new_fingerprints.insert(object.clone(), fingerprint);
960            } else if builtin.runtime_alterable() {
961                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
962                // sentinel value stored in the catalog.
963                assert_eq!(
964                    object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
965                    "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
966                );
967            } else {
968                panic!(
969                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
970                    fingerprint, object_info.fingerprint,
971                );
972            }
973        }
974
975        Ok(new_fingerprints)
976    }
977
978    /// Perform cleanup of migration state, i.e. the migration shard.
979    ///
980    /// Returns a list of shards to finalize, and a `Future` that must be run after the shard
981    /// finalization has been durably enqueued. The `Future` is used to remove entries from the
982    /// migration shard only after we know the respective shards will be finalized. Removing
983    /// entries immediately would risk leaking the shards.
984    ///
985    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
986    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
987    /// it is safe to assume that any state for versions below the `target_version` is not needed
988    /// anymore and can be cleaned up.
989    ///
990    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
991    /// shard should always be pretty small, so keeping migration state around for longer isn't a
992    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
993    /// to transient failures, instead of retrying.
994    async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
995        let noop_action = async {}.boxed();
996        let noop_result = (BTreeSet::new(), noop_action);
997
998        if self.config.read_only {
999            return Ok(noop_result);
1000        }
1001
1002        let diagnostics = Diagnostics {
1003            shard_name: "builtin_migration".to_string(),
1004            handle_purpose: "builtin schema migration cleanup".into(),
1005        };
1006        let (mut persist_write, mut persist_read) =
1007            self.open_migration_shard(diagnostics.clone()).await;
1008        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
1009
1010        let upper = persist_write.fetch_recent_upper().await.clone();
1011        let write_ts = *upper.as_option().expect("migration shard not sealed");
1012        let Some(read_ts) = write_ts.step_back() else {
1013            return Ok(noop_result);
1014        };
1015
1016        // Collect old entries to remove.
1017        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
1018        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
1019        else {
1020            return Ok(noop_result);
1021        };
1022
1023        debug!(
1024            ?stale_entries,
1025            "cleaning migration shard up to version {}", self.target_version,
1026        );
1027
1028        let current_shards: BTreeMap<_, _> = self
1029            .system_objects
1030            .values()
1031            .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1032            .collect();
1033
1034        let mut shards_to_finalize = BTreeSet::new();
1035        let mut retractions = Vec::new();
1036        for (key, shard_id) in stale_entries {
1037            // The migration shard contains both shards created during aborted upgrades and shards
1038            // created during successful upgrades. The latter may still be in use, so we have to
1039            // check and only finalize those that aren't anymore.
1040            let gid = GlobalId::System(key.global_id);
1041            if current_shards.get(&gid) != Some(&shard_id) {
1042                shards_to_finalize.insert(shard_id);
1043            }
1044
1045            retractions.push(((key, shard_id), write_ts, -1));
1046        }
1047
1048        let cleanup_action = async move {
1049            if !retractions.is_empty() {
1050                let new_upper = Antichain::from_elem(write_ts.step_forward());
1051                let result = persist_write
1052                    .compare_and_append(retractions, upper, new_upper)
1053                    .await
1054                    .expect("valid usage");
1055                match result {
1056                    Ok(()) => debug!("cleaned up migration shard"),
1057                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1058                }
1059            }
1060        }
1061        .boxed();
1062
1063        // Downgrade the since, to enable some compaction.
1064        let o = persist_since.opaque().clone();
1065        let new_since = Antichain::from_elem(read_ts);
1066        let result = persist_since
1067            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1068            .await;
1069        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1070
1071        Ok((shards_to_finalize, cleanup_action))
1072    }
1073}
1074
1075/// Read the migration shard at the given timestamp, returning all entries that match the given
1076/// predicate.
1077///
1078/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
1079/// `read_ts`.
1080async fn read_migration_shard<P>(
1081    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1082    read_ts: Timestamp,
1083    predicate: P,
1084) -> Option<Vec<(migration_shard::Key, ShardId)>>
1085where
1086    P: for<'a> Fn(&migration_shard::Key) -> bool,
1087{
1088    let as_of = Antichain::from_elem(read_ts);
1089    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1090
1091    assert!(
1092        updates.iter().all(|(_, _, diff)| *diff == 1),
1093        "migration shard contains invalid diffs: {updates:?}",
1094    );
1095
1096    let entries: Vec<_> = updates
1097        .into_iter()
1098        .map(|(data, _, _)| data)
1099        .filter(move |(key, _)| predicate(key))
1100        .collect();
1101
1102    (!entries.is_empty()).then_some(entries)
1103}
1104
1105/// A plan to migrate between two versions.
1106#[derive(Debug, Default)]
1107struct Plan {
1108    /// Objects to migrate using the `Evolution` mechanism.
1109    evolve: Vec<SystemObjectDescription>,
1110    /// Objects to migrate using the `Replacement` mechanism.
1111    replace: Vec<SystemObjectDescription>,
1112}
1113
1114/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
1115mod migration_shard {
1116    use std::fmt;
1117    use std::str::FromStr;
1118
1119    use arrow::array::{StringArray, StringBuilder};
1120    use bytes::{BufMut, Bytes};
1121    use mz_persist_types::Codec;
1122    use mz_persist_types::codec_impls::{
1123        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1124    };
1125    use mz_persist_types::columnar::Schema;
1126    use mz_persist_types::stats::NoneStats;
1127    use semver::Version;
1128    use serde::{Deserialize, Serialize};
1129
1130    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1131    pub(super) struct Key {
1132        pub(super) global_id: u64,
1133        pub(super) build_version: Version,
1134        // Versions < 26.0 didn't include the deploy generation. As long as we still might
1135        // encounter migration shard entries that don't have it, we need to keep this an `Option`
1136        // and keep supporting both key formats.
1137        pub(super) deploy_generation: Option<u64>,
1138    }
1139
1140    impl fmt::Display for Key {
1141        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1142            if self.deploy_generation.is_some() {
1143                // current format
1144                let s = serde_json::to_string(self).expect("JSON serializable");
1145                f.write_str(&s)
1146            } else {
1147                // pre-26.0 format
1148                write!(f, "{}-{}", self.global_id, self.build_version)
1149            }
1150        }
1151    }
1152
1153    impl FromStr for Key {
1154        type Err = String;
1155
1156        fn from_str(s: &str) -> Result<Self, String> {
1157            // current format
1158            if let Ok(key) = serde_json::from_str(s) {
1159                return Ok(key);
1160            };
1161
1162            // pre-26.0 format
1163            let parts: Vec<_> = s.splitn(2, '-').collect();
1164            let &[global_id, build_version] = parts.as_slice() else {
1165                return Err(format!("invalid Key '{s}'"));
1166            };
1167            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1168            let build_version = build_version
1169                .parse::<Version>()
1170                .map_err(|e| e.to_string())?;
1171            Ok(Key {
1172                global_id,
1173                build_version,
1174                deploy_generation: None,
1175            })
1176        }
1177    }
1178
1179    impl Default for Key {
1180        fn default() -> Self {
1181            Self {
1182                global_id: Default::default(),
1183                build_version: Version::new(0, 0, 0),
1184                deploy_generation: Some(0),
1185            }
1186        }
1187    }
1188
1189    impl Codec for Key {
1190        type Schema = KeySchema;
1191        type Storage = ();
1192
1193        fn codec_name() -> String {
1194            "TableKey".into()
1195        }
1196
1197        fn encode<B: BufMut>(&self, buf: &mut B) {
1198            buf.put(self.to_string().as_bytes())
1199        }
1200
1201        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1202            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1203            s.parse()
1204        }
1205
1206        fn encode_schema(_schema: &KeySchema) -> Bytes {
1207            Bytes::new()
1208        }
1209
1210        fn decode_schema(buf: &Bytes) -> Self::Schema {
1211            assert_eq!(*buf, Bytes::new());
1212            KeySchema
1213        }
1214    }
1215
1216    impl SimpleColumnarData for Key {
1217        type ArrowBuilder = StringBuilder;
1218        type ArrowColumn = StringArray;
1219
1220        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1221            builder.values_slice().len()
1222        }
1223
1224        fn push(&self, builder: &mut Self::ArrowBuilder) {
1225            builder.append_value(&self.to_string());
1226        }
1227
1228        fn push_null(builder: &mut Self::ArrowBuilder) {
1229            builder.append_null();
1230        }
1231
1232        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1233            *self = column.value(idx).parse().expect("valid Key");
1234        }
1235    }
1236
1237    #[derive(Debug, PartialEq)]
1238    pub(super) struct KeySchema;
1239
1240    impl Schema<Key> for KeySchema {
1241        type ArrowColumn = StringArray;
1242        type Statistics = NoneStats;
1243        type Decoder = SimpleColumnarDecoder<Key>;
1244        type Encoder = SimpleColumnarEncoder<Key>;
1245
1246        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1247            Ok(SimpleColumnarEncoder::default())
1248        }
1249
1250        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1251            Ok(SimpleColumnarDecoder::new(col))
1252        }
1253    }
1254}
1255
1256#[cfg(test)]
1257#[path = "builtin_schema_migration_tests.rs"]
1258mod tests;