Skip to main content

mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40    MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::{Opaque, SinceHandle};
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68/// Builtin schema migrations required to upgrade to the current build version.
69///
70/// Migration steps for old versions must be retained around according to the upgrade policy. For
71/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
72/// can delete all migration steps with versions before `(N-1).0.0`.
73///
74/// Smallest supported version: 0.147.0
75static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
76    vec![
77        MigrationStep::replacement(
78            "0.149.0",
79            CatalogItemType::Source,
80            MZ_INTERNAL_SCHEMA,
81            "mz_sink_statistics_raw",
82        ),
83        MigrationStep::replacement(
84            "0.149.0",
85            CatalogItemType::Source,
86            MZ_INTERNAL_SCHEMA,
87            "mz_source_statistics_raw",
88        ),
89        MigrationStep::evolution(
90            "0.159.0",
91            CatalogItemType::Source,
92            MZ_INTERNAL_SCHEMA,
93            "mz_cluster_replica_metrics_history",
94        ),
95        MigrationStep::replacement(
96            "0.160.0",
97            CatalogItemType::Table,
98            MZ_CATALOG_SCHEMA,
99            "mz_roles",
100        ),
101        MigrationStep::replacement(
102            "0.160.0",
103            CatalogItemType::Table,
104            MZ_CATALOG_SCHEMA,
105            "mz_sinks",
106        ),
107        MigrationStep::replacement(
108            "26.18.0-dev.0",
109            CatalogItemType::MaterializedView,
110            MZ_CATALOG_SCHEMA,
111            "mz_databases",
112        ),
113        MigrationStep::replacement(
114            "26.19.0-dev.0",
115            CatalogItemType::MaterializedView,
116            MZ_CATALOG_SCHEMA,
117            "mz_schemas",
118        ),
119        MigrationStep::replacement(
120            "26.19.0-dev.0",
121            CatalogItemType::MaterializedView,
122            MZ_CATALOG_SCHEMA,
123            "mz_role_members",
124        ),
125        MigrationStep::replacement(
126            "26.19.0-dev.0",
127            CatalogItemType::MaterializedView,
128            MZ_INTERNAL_SCHEMA,
129            "mz_network_policies",
130        ),
131        MigrationStep::replacement(
132            "26.19.0-dev.0",
133            CatalogItemType::MaterializedView,
134            MZ_INTERNAL_SCHEMA,
135            "mz_network_policy_rules",
136        ),
137        MigrationStep::replacement(
138            "26.19.0-dev.0",
139            CatalogItemType::MaterializedView,
140            MZ_INTERNAL_SCHEMA,
141            "mz_cluster_workload_classes",
142        ),
143        MigrationStep::replacement(
144            "26.19.0-dev.0",
145            CatalogItemType::MaterializedView,
146            MZ_INTERNAL_SCHEMA,
147            "mz_internal_cluster_replicas",
148        ),
149        MigrationStep::replacement(
150            "26.19.0-dev.0",
151            CatalogItemType::MaterializedView,
152            MZ_INTERNAL_SCHEMA,
153            "mz_pending_cluster_replicas",
154        ),
155    ]
156});
157
158/// A migration required to upgrade past a specific version.
159#[derive(Clone, Debug)]
160struct MigrationStep {
161    /// The build version that requires this migration.
162    version: Version,
163    /// The object that requires migration.
164    object: SystemObjectDescription,
165    /// The migration mechanism to be used.
166    mechanism: Mechanism,
167}
168
169impl MigrationStep {
170    /// Helper to construct an `Evolution` migration step.
171    fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
172        Self {
173            version: Version::parse(version).expect("valid"),
174            object: SystemObjectDescription {
175                schema_name: schema.into(),
176                object_type: type_,
177                object_name: name.into(),
178            },
179            mechanism: Mechanism::Evolution,
180        }
181    }
182
183    /// Helper to construct a `Replacement` migration step.
184    fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
185        Self {
186            version: Version::parse(version).expect("valid"),
187            object: SystemObjectDescription {
188                schema_name: schema.into(),
189                object_type: type_,
190                object_name: name.into(),
191            },
192            mechanism: Mechanism::Replacement,
193        }
194    }
195}
196
197/// The mechanism to use to migrate the schema of a builtin collection.
198#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
199#[allow(dead_code)]
200enum Mechanism {
201    /// Persist schema evolution.
202    ///
203    /// Keeps existing contents but only works for schema changes that are backward compatible
204    /// according to [`backward_compatible`].
205    Evolution,
206    /// Shard replacement.
207    ///
208    /// Works for arbitrary schema changes but loses existing contents.
209    Replacement,
210}
211
212/// The result of a builtin schema migration.
213pub(super) struct MigrationResult {
214    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
215    pub replaced_items: BTreeSet<CatalogItemId>,
216    /// A cleanup action to take once the migration has been made durable.
217    pub cleanup_action: BoxFuture<'static, ()>,
218}
219
220impl Default for MigrationResult {
221    fn default() -> Self {
222        Self {
223            replaced_items: Default::default(),
224            cleanup_action: async {}.boxed(),
225        }
226    }
227}
228
229/// Run builtin schema migrations.
230///
231/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
232/// `BUILTINS` and `MIGRATIONS` lists to initialize the lists of available builtins and required
233/// migrations, respectively.
234pub(super) async fn run(
235    build_info: &BuildInfo,
236    deploy_generation: u64,
237    txn: &mut Transaction<'_>,
238    config: BuiltinItemMigrationConfig,
239) -> Result<MigrationResult, Error> {
240    // Sanity check to ensure we're not touching durable state in read-only mode.
241    assert_eq!(config.read_only, txn.is_savepoint());
242
243    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
244    // migrations if we observe this build info.
245    if *build_info == DUMMY_BUILD_INFO {
246        return Ok(MigrationResult::default());
247    }
248
249    let Some(durable_version) = get_migration_version(txn) else {
250        // New catalog; nothing to do.
251        return Ok(MigrationResult::default());
252    };
253    let build_version = build_info.semver_version();
254
255    let collection_metadata = txn.get_collection_metadata();
256    let system_objects = txn
257        .get_system_object_mappings()
258        .map(|m| {
259            let object = m.description;
260            let global_id = m.unique_identifier.global_id;
261            let shard_id = collection_metadata.get(&global_id).copied();
262            let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
263                panic!("missing builtin {object:?}");
264            };
265            let info = ObjectInfo {
266                global_id,
267                shard_id,
268                builtin,
269                fingerprint: m.unique_identifier.fingerprint,
270            };
271            (object, info)
272        })
273        .collect();
274
275    let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
276
277    let migration = Migration {
278        source_version: durable_version.clone(),
279        target_version: build_version.clone(),
280        deploy_generation,
281        system_objects,
282        migration_shard,
283        config,
284    };
285
286    let result = migration.run(&MIGRATIONS).await.map_err(|e| {
287        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
288            last_seen_version: durable_version.to_string(),
289            this_version: build_version.to_string(),
290            cause: e.to_string(),
291        })
292    })?;
293
294    result.apply(txn);
295
296    let replaced_items = txn
297        .get_system_object_mappings()
298        .map(|m| m.unique_identifier)
299        .filter(|ids| result.new_shards.contains_key(&ids.global_id))
300        .map(|ids| ids.catalog_id)
301        .collect();
302
303    Ok(MigrationResult {
304        replaced_items,
305        cleanup_action: result.cleanup_action,
306    })
307}
308
309/// Result produced by `Migration::run`.
310struct MigrationRunResult {
311    new_shards: BTreeMap<GlobalId, ShardId>,
312    new_fingerprints: BTreeMap<SystemObjectDescription, String>,
313    shards_to_finalize: BTreeSet<ShardId>,
314    cleanup_action: BoxFuture<'static, ()>,
315}
316
317impl Default for MigrationRunResult {
318    fn default() -> Self {
319        Self {
320            new_shards: BTreeMap::new(),
321            new_fingerprints: BTreeMap::new(),
322            shards_to_finalize: BTreeSet::new(),
323            cleanup_action: async {}.boxed(),
324        }
325    }
326}
327
328impl MigrationRunResult {
329    /// Apply this migration result to the given transaction.
330    fn apply(&self, txn: &mut Transaction<'_>) {
331        // Update collection metadata.
332        let replaced_ids = self.new_shards.keys().copied().collect();
333        let old_metadata = txn.delete_collection_metadata(replaced_ids);
334        txn.insert_collection_metadata(self.new_shards.clone())
335            .expect("inserting unique shards IDs after deleting existing entries");
336
337        // Register shards for finalization.
338        let mut unfinalized_shards: BTreeSet<_> =
339            old_metadata.into_iter().map(|(_, sid)| sid).collect();
340        unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
341        txn.insert_unfinalized_shards(unfinalized_shards)
342            .expect("cannot fail");
343
344        // Update fingerprints.
345        let mappings = txn
346            .get_system_object_mappings()
347            .filter_map(|m| {
348                let fingerprint = self.new_fingerprints.get(&m.description)?;
349                Some(SystemObjectMapping {
350                    description: m.description,
351                    unique_identifier: SystemObjectUniqueIdentifier {
352                        catalog_id: m.unique_identifier.catalog_id,
353                        global_id: m.unique_identifier.global_id,
354                        fingerprint: fingerprint.clone(),
355                    },
356                })
357            })
358            .collect();
359        txn.set_system_object_mappings(mappings)
360            .expect("filtered existing mappings remain unique");
361    }
362}
363
364/// Information about a system object required to run a `Migration`.
365#[derive(Clone, Debug)]
366struct ObjectInfo {
367    global_id: GlobalId,
368    shard_id: Option<ShardId>,
369    builtin: &'static Builtin<NameReference>,
370    fingerprint: String,
371}
372
373/// Context of a builtin schema migration.
374struct Migration {
375    /// The version we are migrating from.
376    ///
377    /// Same as the build version of the most recent leader process that successfully performed
378    /// migrations.
379    source_version: Version,
380    /// The version we are migration to.
381    ///
382    /// Same as the build version of this process.
383    target_version: Version,
384    /// The deploy generation of this process.
385    deploy_generation: u64,
386    /// Information about all objects in the system.
387    system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
388    /// The ID of the migration shard.
389    migration_shard: ShardId,
390    /// Additional configuration.
391    config: BuiltinItemMigrationConfig,
392}
393
394impl Migration {
395    async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
396        info!(
397            deploy_generation = %self.deploy_generation,
398            "running builtin schema migration: {} -> {}",
399            self.source_version, self.target_version
400        );
401
402        self.validate_migration_steps(steps);
403
404        let (force, plan) = match self.config.force_migration.as_deref() {
405            None => (false, self.plan_migration(steps)),
406            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
407            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
408            Some(other) => panic!("unknown force migration mechanism: {other}"),
409        };
410
411        if self.source_version == self.target_version && !force {
412            info!("skipping migration: already at target version");
413            return Ok(MigrationRunResult::default());
414        } else if self.source_version > self.target_version {
415            bail!("downgrade not supported");
416        }
417
418        // In leader mode, upgrade the version of the migration shard to the target version.
419        // This fences out any readers at lower versions.
420        if !self.config.read_only {
421            self.upgrade_migration_shard_version().await;
422        }
423
424        info!("executing migration plan: {plan:?}");
425
426        self.migrate_evolve(&plan.evolve).await?;
427        let new_shards = self.migrate_replace(&plan.replace).await?;
428
429        let mut migrated_objects = BTreeSet::new();
430        migrated_objects.extend(plan.evolve);
431        migrated_objects.extend(plan.replace);
432
433        let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
434
435        let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
436
437        Ok(MigrationRunResult {
438            new_shards,
439            new_fingerprints,
440            shards_to_finalize,
441            cleanup_action,
442        })
443    }
444
445    /// Sanity check the given migration steps.
446    ///
447    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
448    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
449        for step in steps {
450            assert!(
451                step.version <= self.target_version,
452                "migration step version greater than target version: {} > {}",
453                step.version,
454                self.target_version,
455            );
456
457            let object = &step.object;
458
459            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
460            // cause the table to be truncated because the contents are not also stored in the durable
461            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
462            // on startup. The correctness of that pruning relies on there being no other retractions
463            // to `mz_storage_usage_by_shard`.
464            //
465            // TODO: Confirm the above reasoning, it might be outdated?
466            assert_ne!(
467                &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
468                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
469            );
470
471            // `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it
472            // wouldn't be very durable if we allowed it to be truncated.
473            assert_ne!(
474                &*MZ_CATALOG_RAW_DESCRIPTION, object,
475                "mz_catalog_raw cannot be migrated"
476            );
477
478            let Some(object_info) = self.system_objects.get(object) else {
479                panic!("migration step for non-existent builtin: {object:?}");
480            };
481
482            let builtin = object_info.builtin;
483            use Builtin::*;
484            assert!(
485                matches!(
486                    builtin,
487                    Table(..) | Source(..) | MaterializedView(..) | ContinualTask(..)
488                ),
489                "schema migration not supported for builtin: {builtin:?}",
490            );
491        }
492    }
493
494    /// Select for each object to migrate the appropriate migration mechanism.
495    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
496        // Ignore any steps at versions before `source_version`.
497        let steps = steps.iter().filter(|s| s.version > self.source_version);
498
499        // Select a mechanism for each object, according to the requested migrations:
500        //  * If any `Replacement` was requested, use `Replacement`.
501        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
502        let mut by_object = BTreeMap::new();
503        for step in steps {
504            if let Some(entry) = by_object.get_mut(&step.object) {
505                *entry = match (step.mechanism, *entry) {
506                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
507                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
508                        Mechanism::Replacement
509                    }
510                };
511            } else {
512                by_object.insert(step.object.clone(), step.mechanism);
513            }
514        }
515
516        let mut plan = Plan::default();
517        for (object, mechanism) in by_object {
518            match mechanism {
519                Mechanism::Evolution => plan.evolve.push(object),
520                Mechanism::Replacement => plan.replace.push(object),
521            }
522        }
523
524        plan
525    }
526
527    /// Plan a forced migration of all objects using the given mechanism.
528    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
529        let objects = self
530            .system_objects
531            .iter()
532            .filter(|(_, info)| {
533                use Builtin::*;
534                match info.builtin {
535                    Table(..) | MaterializedView(..) | ContinualTask(..) => true,
536                    Source(source) => **source != *MZ_CATALOG_RAW,
537                    Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
538                }
539            })
540            .map(|(object, _)| object.clone())
541            .collect();
542
543        let mut plan = Plan::default();
544        match mechanism {
545            Mechanism::Evolution => plan.evolve = objects,
546            Mechanism::Replacement => plan.replace = objects,
547        }
548
549        plan
550    }
551
552    /// Upgrade the migration shard to the target version.
553    async fn upgrade_migration_shard_version(&self) {
554        let persist = &self.config.persist_client;
555        let diagnostics = Diagnostics {
556            shard_name: "builtin_migration".to_string(),
557            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
558        };
559
560        persist
561            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
562                self.migration_shard,
563                diagnostics,
564            )
565            .await
566            .expect("valid usage");
567    }
568
569    /// Migrate the given objects using the `Evolution` mechanism.
570    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
571        for object in objects {
572            self.migrate_evolve_one(object).await?;
573        }
574        Ok(())
575    }
576
577    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
578        let persist = &self.config.persist_client;
579
580        let Some(object_info) = self.system_objects.get(object) else {
581            bail!("missing builtin {object:?}");
582        };
583        let id = object_info.global_id;
584
585        let Some(shard_id) = object_info.shard_id else {
586            // No shard is registered for this builtin. In leader mode, this is fine, we'll
587            // register the shard during bootstrap. In read-only mode, we might be racing with the
588            // leader to register the shard and it's unclear what sort of confusion can arise from
589            // that -- better to bail out in this case.
590            if self.config.read_only {
591                bail!("missing shard ID for builtin {object:?} ({id})");
592            } else {
593                return Ok(());
594            }
595        };
596
597        let target_desc = match object_info.builtin {
598            Builtin::Table(table) => &table.desc,
599            Builtin::Source(source) => &source.desc,
600            Builtin::MaterializedView(mv) => &mv.desc,
601            Builtin::ContinualTask(ct) => &ct.desc,
602            _ => bail!("not a storage collection: {object:?}"),
603        };
604
605        let diagnostics = Diagnostics {
606            shard_name: id.to_string(),
607            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
608        };
609        let source_schema = persist
610            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
611            .await
612            .expect("valid usage");
613
614        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
615
616        if self.config.read_only {
617            // In read-only mode, only check that the new schema is backward compatible.
618            // We'll register it when/if we restart in leader mode.
619            if let Some((_, source_desc, _)) = &source_schema {
620                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
621                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
622                if backward_compatible(&old, &new).is_none() {
623                    bail!(
624                        "incompatible schema evolution for {object:?}: \
625                         {source_desc:?} -> {target_desc:?}"
626                    );
627                }
628            }
629
630            return Ok(());
631        }
632
633        let (mut schema_id, mut source_desc) = match source_schema {
634            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
635            None => {
636                // If no schema was previously registered, simply try to register the new one. This
637                // might fail due to a concurrent registration, in which case we'll fall back to
638                // `compare_and_evolve_schema`.
639
640                debug!(%id, %shard_id, "no previous schema found; registering initial one");
641                let schema_id = persist
642                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
643                        shard_id,
644                        target_desc,
645                        &UnitSchema,
646                        diagnostics.clone(),
647                    )
648                    .await
649                    .expect("valid usage");
650                if schema_id.is_some() {
651                    return Ok(());
652                }
653
654                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
655                let (schema_id, source_desc, _) = persist
656                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
657                        shard_id,
658                        diagnostics.clone(),
659                    )
660                    .await
661                    .expect("valid usage")
662                    .expect("known to exist");
663
664                (schema_id, source_desc)
665            }
666        };
667
668        loop {
669            // Evolving the schema might fail if another process evolved the schema concurrently,
670            // in which case we need to retry. Most likely the other process evolved the schema to
671            // our own target schema and the second try is a no-op.
672
673            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
674            let result = persist
675                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
676                    shard_id,
677                    schema_id,
678                    target_desc,
679                    &UnitSchema,
680                    diagnostics.clone(),
681                )
682                .await
683                .expect("valid usage");
684
685            match result {
686                CaESchema::Ok(schema_id) => {
687                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
688                    break;
689                }
690                CaESchema::Incompatible => bail!(
691                    "incompatible schema evolution for {object:?}: \
692                     {source_desc:?} -> {target_desc:?}"
693                ),
694                CaESchema::ExpectedMismatch {
695                    schema_id: new_id,
696                    key,
697                    val: UnitSchema,
698                } => {
699                    schema_id = new_id;
700                    source_desc = key;
701                }
702            }
703        }
704
705        Ok(())
706    }
707
708    /// Migrate the given objects using the `Replacement` mechanism.
709    async fn migrate_replace(
710        &self,
711        objects: &[SystemObjectDescription],
712    ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
713        if objects.is_empty() {
714            return Ok(Default::default());
715        }
716
717        let diagnostics = Diagnostics {
718            shard_name: "builtin_migration".to_string(),
719            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
720        };
721        let (mut persist_write, mut persist_read) =
722            self.open_migration_shard(diagnostics.clone()).await;
723
724        let mut ids_to_replace = BTreeSet::new();
725        for object in objects {
726            if let Some(info) = self.system_objects.get(object) {
727                ids_to_replace.insert(info.global_id);
728            } else {
729                bail!("missing id for builtin {object:?}");
730            }
731        }
732
733        info!(?objects, ?ids_to_replace, "migrating by replacement");
734
735        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
736        // This can fail due to writes by concurrent processes, so we need to retry.
737        let replaced_shards = loop {
738            if let Some(shards) = self
739                .try_get_or_insert_replacement_shards(
740                    &ids_to_replace,
741                    &mut persist_write,
742                    &mut persist_read,
743                )
744                .await?
745            {
746                break shards;
747            }
748        };
749
750        Ok(replaced_shards)
751    }
752
753    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
754    /// `target_version` and `deploy_generation`.
755    ///
756    /// This method looks for existing entries in the migration shards and returns those if they
757    /// are present. Otherwise it generates new shard IDs and tries to insert them.
758    ///
759    /// The result of this call is `None` if no existing entries were found and inserting new ones
760    /// failed because of a concurrent write to the migration shard. In this case, the caller is
761    /// expected to retry.
762    async fn try_get_or_insert_replacement_shards(
763        &self,
764        ids_to_replace: &BTreeSet<GlobalId>,
765        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
766        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
767    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
768        let upper = persist_write.fetch_recent_upper().await;
769        let write_ts = *upper.as_option().expect("migration shard not sealed");
770
771        let mut ids_to_replace = ids_to_replace.clone();
772        let mut replaced_shards = BTreeMap::new();
773
774        // Another process might already have done a shard replacement at our version and
775        // generation, in which case we can directly reuse the replacement shards.
776        //
777        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
778        // do. The set of migrations to run depends on both the source and the target version, and
779        // the migration shard is not keyed by source version. The previous writer might have seen
780        // a different source version, if there was a concurrent migration by a leader process.
781        if let Some(read_ts) = write_ts.step_back() {
782            let pred = |key: &migration_shard::Key| {
783                key.build_version == self.target_version
784                    && key.deploy_generation == Some(self.deploy_generation)
785            };
786            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
787                for (key, shard_id) in entries {
788                    let id = GlobalId::System(key.global_id);
789                    if ids_to_replace.remove(&id) {
790                        replaced_shards.insert(id, shard_id);
791                    }
792                }
793
794                debug!(
795                    %read_ts, ?replaced_shards, ?ids_to_replace,
796                    "found existing entries in migration shard",
797                );
798            }
799
800            if ids_to_replace.is_empty() {
801                return Ok(Some(replaced_shards));
802            }
803        }
804
805        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
806        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
807        // and we need to re-check the migration shard contents.
808        let mut updates = Vec::new();
809        for id in ids_to_replace {
810            let shard_id = ShardId::new();
811            replaced_shards.insert(id, shard_id);
812
813            let GlobalId::System(global_id) = id else {
814                bail!("attempt to migrate a non-system collection: {id}");
815            };
816            let key = migration_shard::Key {
817                global_id,
818                build_version: self.target_version.clone(),
819                deploy_generation: Some(self.deploy_generation),
820            };
821            updates.push(((key, shard_id), write_ts, 1));
822        }
823
824        let upper = Antichain::from_elem(write_ts);
825        let new_upper = Antichain::from_elem(write_ts.step_forward());
826        debug!(%write_ts, "attempting insert into migration shard");
827        let result = persist_write
828            .compare_and_append(updates, upper, new_upper)
829            .await
830            .expect("valid usage");
831
832        match result {
833            Ok(()) => {
834                debug!(
835                    %write_ts, ?replaced_shards,
836                    "successfully inserted into migration shard"
837                );
838                Ok(Some(replaced_shards))
839            }
840            Err(_mismatch) => Ok(None),
841        }
842    }
843
844    /// Open writer and reader for the migration shard.
845    async fn open_migration_shard(
846        &self,
847        diagnostics: Diagnostics,
848    ) -> (
849        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
850        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
851    ) {
852        let persist = &self.config.persist_client;
853
854        persist
855            .open(
856                self.migration_shard,
857                Arc::new(migration_shard::KeySchema),
858                Arc::new(ShardIdSchema),
859                diagnostics,
860                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
861            )
862            .await
863            .expect("valid usage")
864    }
865
866    /// Open a [`SinceHandle`] for the migration shard.
867    async fn open_migration_shard_since(
868        &self,
869        diagnostics: Diagnostics,
870    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
871        self.config
872            .persist_client
873            .open_critical_since(
874                self.migration_shard,
875                // TODO: We may need to use a different critical reader
876                // id for this if we want to be able to introspect it via SQL.
877                PersistClient::CONTROLLER_CRITICAL_SINCE,
878                Opaque::encode(&i64::MIN),
879                diagnostics.clone(),
880            )
881            .await
882            .expect("valid usage")
883    }
884
885    /// Update the fingerprints for `migrated_items`.
886    ///
887    /// Returns the new fingerprints. Also asserts that the current fingerprints of all other
888    /// system items match their builtin definitions.
889    fn update_fingerprints(
890        &self,
891        migrated_items: &BTreeSet<SystemObjectDescription>,
892    ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
893        let mut new_fingerprints = BTreeMap::new();
894        for (object, object_info) in &self.system_objects {
895            let id = object_info.global_id;
896            let builtin = object_info.builtin;
897
898            let fingerprint = builtin.fingerprint();
899            if fingerprint == object_info.fingerprint {
900                continue; // fingerprint unchanged, nothing to do
901            }
902
903            // Fingerprint mismatch is expected for a migrated item.
904            let migrated = migrated_items.contains(object);
905            // Some builtin types have schemas but no durable state. No migration needed for those.
906            let ephemeral = matches!(
907                builtin,
908                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
909            );
910
911            if migrated || ephemeral {
912                new_fingerprints.insert(object.clone(), fingerprint);
913            } else if builtin.runtime_alterable() {
914                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
915                // sentinel value stored in the catalog.
916                assert_eq!(
917                    object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
918                    "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
919                );
920            } else {
921                panic!(
922                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
923                    fingerprint, object_info.fingerprint,
924                );
925            }
926        }
927
928        Ok(new_fingerprints)
929    }
930
931    /// Perform cleanup of migration state, i.e. the migration shard.
932    ///
933    /// Returns a list of shards to finalize, and a `Future` that must be run after the shard
934    /// finalization has been durably enqueued. The `Future` is used to remove entries from the
935    /// migration shard only after we know the respective shards will be finalized. Removing
936    /// entries immediately would risk leaking the shards.
937    ///
938    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
939    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
940    /// it is safe to assume that any state for versions below the `target_version` is not needed
941    /// anymore and can be cleaned up.
942    ///
943    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
944    /// shard should always be pretty small, so keeping migration state around for longer isn't a
945    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
946    /// to transient failures, instead of retrying.
947    async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
948        let noop_action = async {}.boxed();
949        let noop_result = (BTreeSet::new(), noop_action);
950
951        if self.config.read_only {
952            return Ok(noop_result);
953        }
954
955        let diagnostics = Diagnostics {
956            shard_name: "builtin_migration".to_string(),
957            handle_purpose: "builtin schema migration cleanup".into(),
958        };
959        let (mut persist_write, mut persist_read) =
960            self.open_migration_shard(diagnostics.clone()).await;
961        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
962
963        let upper = persist_write.fetch_recent_upper().await.clone();
964        let write_ts = *upper.as_option().expect("migration shard not sealed");
965        let Some(read_ts) = write_ts.step_back() else {
966            return Ok(noop_result);
967        };
968
969        // Collect old entries to remove.
970        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
971        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
972        else {
973            return Ok(noop_result);
974        };
975
976        debug!(
977            ?stale_entries,
978            "cleaning migration shard up to version {}", self.target_version,
979        );
980
981        let current_shards: BTreeMap<_, _> = self
982            .system_objects
983            .values()
984            .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
985            .collect();
986
987        let mut shards_to_finalize = BTreeSet::new();
988        let mut retractions = Vec::new();
989        for (key, shard_id) in stale_entries {
990            // The migration shard contains both shards created during aborted upgrades and shards
991            // created during successful upgrades. The latter may still be in use, so we have to
992            // check and only finalize those that aren't anymore.
993            let gid = GlobalId::System(key.global_id);
994            if current_shards.get(&gid) != Some(&shard_id) {
995                shards_to_finalize.insert(shard_id);
996            }
997
998            retractions.push(((key, shard_id), write_ts, -1));
999        }
1000
1001        let cleanup_action = async move {
1002            if !retractions.is_empty() {
1003                let new_upper = Antichain::from_elem(write_ts.step_forward());
1004                let result = persist_write
1005                    .compare_and_append(retractions, upper, new_upper)
1006                    .await
1007                    .expect("valid usage");
1008                match result {
1009                    Ok(()) => debug!("cleaned up migration shard"),
1010                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1011                }
1012            }
1013        }
1014        .boxed();
1015
1016        // Downgrade the since, to enable some compaction.
1017        let o = persist_since.opaque().clone();
1018        let new_since = Antichain::from_elem(read_ts);
1019        let result = persist_since
1020            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1021            .await;
1022        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1023
1024        Ok((shards_to_finalize, cleanup_action))
1025    }
1026}
1027
1028/// Read the migration shard at the given timestamp, returning all entries that match the given
1029/// predicate.
1030///
1031/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
1032/// `read_ts`.
1033async fn read_migration_shard<P>(
1034    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1035    read_ts: Timestamp,
1036    predicate: P,
1037) -> Option<Vec<(migration_shard::Key, ShardId)>>
1038where
1039    P: for<'a> Fn(&migration_shard::Key) -> bool,
1040{
1041    let as_of = Antichain::from_elem(read_ts);
1042    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1043
1044    assert!(
1045        updates.iter().all(|(_, _, diff)| *diff == 1),
1046        "migration shard contains invalid diffs: {updates:?}",
1047    );
1048
1049    let entries: Vec<_> = updates
1050        .into_iter()
1051        .map(|(data, _, _)| data)
1052        .filter(move |(key, _)| predicate(key))
1053        .collect();
1054
1055    (!entries.is_empty()).then_some(entries)
1056}
1057
1058/// A plan to migrate between two versions.
1059#[derive(Debug, Default)]
1060struct Plan {
1061    /// Objects to migrate using the `Evolution` mechanism.
1062    evolve: Vec<SystemObjectDescription>,
1063    /// Objects to migrate using the `Replacement` mechanism.
1064    replace: Vec<SystemObjectDescription>,
1065}
1066
1067/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
1068mod migration_shard {
1069    use std::fmt;
1070    use std::str::FromStr;
1071
1072    use arrow::array::{StringArray, StringBuilder};
1073    use bytes::{BufMut, Bytes};
1074    use mz_persist_types::Codec;
1075    use mz_persist_types::codec_impls::{
1076        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1077    };
1078    use mz_persist_types::columnar::Schema;
1079    use mz_persist_types::stats::NoneStats;
1080    use semver::Version;
1081    use serde::{Deserialize, Serialize};
1082
1083    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1084    pub(super) struct Key {
1085        pub(super) global_id: u64,
1086        pub(super) build_version: Version,
1087        // Versions < 26.0 didn't include the deploy generation. As long as we still might
1088        // encounter migration shard entries that don't have it, we need to keep this an `Option`
1089        // and keep supporting both key formats.
1090        pub(super) deploy_generation: Option<u64>,
1091    }
1092
1093    impl fmt::Display for Key {
1094        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095            if self.deploy_generation.is_some() {
1096                // current format
1097                let s = serde_json::to_string(self).expect("JSON serializable");
1098                f.write_str(&s)
1099            } else {
1100                // pre-26.0 format
1101                write!(f, "{}-{}", self.global_id, self.build_version)
1102            }
1103        }
1104    }
1105
1106    impl FromStr for Key {
1107        type Err = String;
1108
1109        fn from_str(s: &str) -> Result<Self, String> {
1110            // current format
1111            if let Ok(key) = serde_json::from_str(s) {
1112                return Ok(key);
1113            };
1114
1115            // pre-26.0 format
1116            let parts: Vec<_> = s.splitn(2, '-').collect();
1117            let &[global_id, build_version] = parts.as_slice() else {
1118                return Err(format!("invalid Key '{s}'"));
1119            };
1120            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1121            let build_version = build_version
1122                .parse::<Version>()
1123                .map_err(|e| e.to_string())?;
1124            Ok(Key {
1125                global_id,
1126                build_version,
1127                deploy_generation: None,
1128            })
1129        }
1130    }
1131
1132    impl Default for Key {
1133        fn default() -> Self {
1134            Self {
1135                global_id: Default::default(),
1136                build_version: Version::new(0, 0, 0),
1137                deploy_generation: Some(0),
1138            }
1139        }
1140    }
1141
1142    impl Codec for Key {
1143        type Schema = KeySchema;
1144        type Storage = ();
1145
1146        fn codec_name() -> String {
1147            "TableKey".into()
1148        }
1149
1150        fn encode<B: BufMut>(&self, buf: &mut B) {
1151            buf.put(self.to_string().as_bytes())
1152        }
1153
1154        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1155            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1156            s.parse()
1157        }
1158
1159        fn encode_schema(_schema: &KeySchema) -> Bytes {
1160            Bytes::new()
1161        }
1162
1163        fn decode_schema(buf: &Bytes) -> Self::Schema {
1164            assert_eq!(*buf, Bytes::new());
1165            KeySchema
1166        }
1167    }
1168
1169    impl SimpleColumnarData for Key {
1170        type ArrowBuilder = StringBuilder;
1171        type ArrowColumn = StringArray;
1172
1173        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1174            builder.values_slice().len()
1175        }
1176
1177        fn push(&self, builder: &mut Self::ArrowBuilder) {
1178            builder.append_value(&self.to_string());
1179        }
1180
1181        fn push_null(builder: &mut Self::ArrowBuilder) {
1182            builder.append_null();
1183        }
1184
1185        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1186            *self = column.value(idx).parse().expect("valid Key");
1187        }
1188    }
1189
1190    #[derive(Debug, PartialEq)]
1191    pub(super) struct KeySchema;
1192
1193    impl Schema<Key> for KeySchema {
1194        type ArrowColumn = StringArray;
1195        type Statistics = NoneStats;
1196        type Decoder = SimpleColumnarDecoder<Key>;
1197        type Encoder = SimpleColumnarEncoder<Key>;
1198
1199        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1200            Ok(SimpleColumnarEncoder::default())
1201        }
1202
1203        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1204            Ok(SimpleColumnarDecoder::new(col))
1205        }
1206    }
1207}
1208
1209#[cfg(test)]
1210#[path = "builtin_schema_migration_tests.rs"]
1211mod tests;