Skip to main content

mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::Arc;
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40    MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::{Opaque, SinceHandle};
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68/// Builtin schema migrations required to upgrade to the current build version.
69///
70/// Migration steps for old versions must be retained around according to the upgrade policy. For
71/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
72/// can delete all migration steps with versions before `(N-1).0.0`.
73///
74/// Smallest supported version: 0.147.0
75const MIGRATIONS: &[MigrationStep] = &[
76    MigrationStep {
77        version: Version::new(0, 149, 0),
78        object: Object {
79            type_: CatalogItemType::Source,
80            schema: MZ_INTERNAL_SCHEMA,
81            name: "mz_sink_statistics_raw",
82        },
83        mechanism: Mechanism::Replacement,
84    },
85    MigrationStep {
86        version: Version::new(0, 149, 0),
87        object: Object {
88            type_: CatalogItemType::Source,
89            schema: MZ_INTERNAL_SCHEMA,
90            name: "mz_source_statistics_raw",
91        },
92        mechanism: Mechanism::Replacement,
93    },
94    MigrationStep {
95        version: Version::new(0, 159, 0),
96        object: Object {
97            type_: CatalogItemType::Source,
98            schema: MZ_INTERNAL_SCHEMA,
99            name: "mz_cluster_replica_metrics_history",
100        },
101        mechanism: Mechanism::Evolution,
102    },
103    MigrationStep {
104        version: Version::new(0, 160, 0),
105        object: Object {
106            type_: CatalogItemType::Table,
107            schema: MZ_CATALOG_SCHEMA,
108            name: "mz_roles",
109        },
110        mechanism: Mechanism::Replacement,
111    },
112    MigrationStep {
113        version: Version::new(0, 160, 0),
114        object: Object {
115            type_: CatalogItemType::Table,
116            schema: MZ_CATALOG_SCHEMA,
117            name: "mz_sinks",
118        },
119        mechanism: Mechanism::Replacement,
120    },
121];
122
123/// A migration required to upgrade past a specific version.
124#[derive(Clone, Debug)]
125struct MigrationStep {
126    /// The build version that requires this migration.
127    version: Version,
128    /// The object that requires migration.
129    object: Object,
130    /// The migration mechanism to be used.
131    mechanism: Mechanism,
132}
133
134/// The mechanism to use to migrate the schema of a builtin collection.
135#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
136#[allow(dead_code)]
137enum Mechanism {
138    /// Persist schema evolution.
139    ///
140    /// Keeps existing contents but only works for schema changes that are backward compatible
141    /// according to [`backward_compatible`].
142    Evolution,
143    /// Shard replacement.
144    ///
145    /// Works for arbitrary schema changes but loses existing contents.
146    Replacement,
147}
148
149/// The object of a migration.
150///
151/// This has the same information as [`SystemObjectDescription`] but can be constructed in `const`
152/// contexts, like the `MIGRATIONS` list.
153#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct Object {
155    type_: CatalogItemType,
156    schema: &'static str,
157    name: &'static str,
158}
159
160impl From<Object> for SystemObjectDescription {
161    fn from(object: Object) -> Self {
162        SystemObjectDescription {
163            schema_name: object.schema.into(),
164            object_type: object.type_,
165            object_name: object.name.into(),
166        }
167    }
168}
169
170/// The result of a builtin schema migration.
171pub(super) struct MigrationResult {
172    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
173    pub replaced_items: BTreeSet<CatalogItemId>,
174    /// A cleanup action to take once the migration has been made durable.
175    pub cleanup_action: BoxFuture<'static, ()>,
176}
177
178impl Default for MigrationResult {
179    fn default() -> Self {
180        Self {
181            replaced_items: Default::default(),
182            cleanup_action: async {}.boxed(),
183        }
184    }
185}
186
187/// Run builtin schema migrations.
188///
189/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
190/// `BUILTINS` and `MIGRATIONS` lists to initialize the lists of available builtins and required
191/// migrations, respectively.
192pub(super) async fn run(
193    build_info: &BuildInfo,
194    deploy_generation: u64,
195    txn: &mut Transaction<'_>,
196    config: BuiltinItemMigrationConfig,
197) -> Result<MigrationResult, Error> {
198    // Sanity check to ensure we're not touching durable state in read-only mode.
199    assert_eq!(config.read_only, txn.is_savepoint());
200
201    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
202    // migrations if we observe this build info.
203    if *build_info == DUMMY_BUILD_INFO {
204        return Ok(MigrationResult::default());
205    }
206
207    let Some(durable_version) = get_migration_version(txn) else {
208        // New catalog; nothing to do.
209        return Ok(MigrationResult::default());
210    };
211    let build_version = build_info.semver_version();
212
213    let collection_metadata = txn.get_collection_metadata();
214    let system_objects = txn
215        .get_system_object_mappings()
216        .map(|m| {
217            let object = m.description;
218            let global_id = m.unique_identifier.global_id;
219            let shard_id = collection_metadata.get(&global_id).copied();
220            let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
221                panic!("missing builtin {object:?}");
222            };
223            let info = ObjectInfo {
224                global_id,
225                shard_id,
226                builtin,
227                fingerprint: m.unique_identifier.fingerprint,
228            };
229            (object, info)
230        })
231        .collect();
232
233    let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
234
235    let migration = Migration {
236        source_version: durable_version.clone(),
237        target_version: build_version.clone(),
238        deploy_generation,
239        system_objects,
240        migration_shard,
241        config,
242    };
243
244    let result = migration.run(MIGRATIONS).await.map_err(|e| {
245        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
246            last_seen_version: durable_version.to_string(),
247            this_version: build_version.to_string(),
248            cause: e.to_string(),
249        })
250    })?;
251
252    result.apply(txn);
253
254    let replaced_items = txn
255        .get_system_object_mappings()
256        .map(|m| m.unique_identifier)
257        .filter(|ids| result.new_shards.contains_key(&ids.global_id))
258        .map(|ids| ids.catalog_id)
259        .collect();
260
261    Ok(MigrationResult {
262        replaced_items,
263        cleanup_action: result.cleanup_action,
264    })
265}
266
267/// Result produced by `Migration::run`.
268struct MigrationRunResult {
269    new_shards: BTreeMap<GlobalId, ShardId>,
270    new_fingerprints: BTreeMap<SystemObjectDescription, String>,
271    shards_to_finalize: BTreeSet<ShardId>,
272    cleanup_action: BoxFuture<'static, ()>,
273}
274
275impl Default for MigrationRunResult {
276    fn default() -> Self {
277        Self {
278            new_shards: BTreeMap::new(),
279            new_fingerprints: BTreeMap::new(),
280            shards_to_finalize: BTreeSet::new(),
281            cleanup_action: async {}.boxed(),
282        }
283    }
284}
285
286impl MigrationRunResult {
287    /// Apply this migration result to the given transaction.
288    fn apply(&self, txn: &mut Transaction<'_>) {
289        // Update collection metadata.
290        let replaced_ids = self.new_shards.keys().copied().collect();
291        let old_metadata = txn.delete_collection_metadata(replaced_ids);
292        txn.insert_collection_metadata(self.new_shards.clone())
293            .expect("inserting unique shards IDs after deleting existing entries");
294
295        // Register shards for finalization.
296        let mut unfinalized_shards: BTreeSet<_> =
297            old_metadata.into_iter().map(|(_, sid)| sid).collect();
298        unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
299        txn.insert_unfinalized_shards(unfinalized_shards)
300            .expect("cannot fail");
301
302        // Update fingerprints.
303        let mappings = txn
304            .get_system_object_mappings()
305            .filter_map(|m| {
306                let fingerprint = self.new_fingerprints.get(&m.description)?;
307                Some(SystemObjectMapping {
308                    description: m.description,
309                    unique_identifier: SystemObjectUniqueIdentifier {
310                        catalog_id: m.unique_identifier.catalog_id,
311                        global_id: m.unique_identifier.global_id,
312                        fingerprint: fingerprint.clone(),
313                    },
314                })
315            })
316            .collect();
317        txn.set_system_object_mappings(mappings)
318            .expect("filtered existing mappings remain unique");
319    }
320}
321
322/// Information about a system object required to run a `Migration`.
323#[derive(Clone, Debug)]
324struct ObjectInfo {
325    global_id: GlobalId,
326    shard_id: Option<ShardId>,
327    builtin: &'static Builtin<NameReference>,
328    fingerprint: String,
329}
330
331/// Context of a builtin schema migration.
332struct Migration {
333    /// The version we are migrating from.
334    ///
335    /// Same as the build version of the most recent leader process that successfully performed
336    /// migrations.
337    source_version: Version,
338    /// The version we are migration to.
339    ///
340    /// Same as the build version of this process.
341    target_version: Version,
342    /// The deploy generation of this process.
343    deploy_generation: u64,
344    /// Information about all objects in the system.
345    system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
346    /// The ID of the migration shard.
347    migration_shard: ShardId,
348    /// Additional configuration.
349    config: BuiltinItemMigrationConfig,
350}
351
352impl Migration {
353    async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
354        info!(
355            deploy_generation = %self.deploy_generation,
356            "running builtin schema migration: {} -> {}",
357            self.source_version, self.target_version
358        );
359
360        self.validate_migration_steps(steps);
361
362        let (force, plan) = match self.config.force_migration.as_deref() {
363            None => (false, self.plan_migration(steps)),
364            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
365            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
366            Some(other) => panic!("unknown force migration mechanism: {other}"),
367        };
368
369        if self.source_version == self.target_version && !force {
370            info!("skipping migration: already at target version");
371            return Ok(MigrationRunResult::default());
372        } else if self.source_version > self.target_version {
373            bail!("downgrade not supported");
374        }
375
376        // In leader mode, upgrade the version of the migration shard to the target version.
377        // This fences out any readers at lower versions.
378        if !self.config.read_only {
379            self.upgrade_migration_shard_version().await;
380        }
381
382        info!("executing migration plan: {plan:?}");
383
384        self.migrate_evolve(&plan.evolve).await?;
385        let new_shards = self.migrate_replace(&plan.replace).await?;
386
387        let mut migrated_objects = BTreeSet::new();
388        migrated_objects.extend(plan.evolve);
389        migrated_objects.extend(plan.replace);
390
391        let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
392
393        let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
394
395        Ok(MigrationRunResult {
396            new_shards,
397            new_fingerprints,
398            shards_to_finalize,
399            cleanup_action,
400        })
401    }
402
403    /// Sanity check the given migration steps.
404    ///
405    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
406    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
407        for step in steps {
408            assert!(
409                step.version <= self.target_version,
410                "migration step version greater than target version: {} > {}",
411                step.version,
412                self.target_version,
413            );
414
415            let object = SystemObjectDescription::from(step.object.clone());
416
417            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
418            // cause the table to be truncated because the contents are not also stored in the durable
419            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
420            // on startup. The correctness of that pruning relies on there being no other retractions
421            // to `mz_storage_usage_by_shard`.
422            //
423            // TODO: Confirm the above reasoning, it might be outdated?
424            assert_ne!(
425                *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
426                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
427            );
428
429            // `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it
430            // wouldn't be very durable if we allowed it to be truncated.
431            assert_ne!(
432                *MZ_CATALOG_RAW_DESCRIPTION, object,
433                "mz_catalog_raw cannot be migrated"
434            );
435
436            let Some(object_info) = self.system_objects.get(&object) else {
437                panic!("migration step for non-existent builtin: {object:?}");
438            };
439
440            let builtin = object_info.builtin;
441            use Builtin::*;
442            assert!(
443                matches!(builtin, Table(..) | Source(..) | ContinualTask(..)),
444                "schema migration not supported for builtin: {builtin:?}",
445            );
446        }
447    }
448
449    /// Select for each object to migrate the appropriate migration mechanism.
450    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
451        // Ignore any steps at versions before `source_version`.
452        let steps = steps.iter().filter(|s| s.version > self.source_version);
453
454        // Select a mechanism for each object, according to the requested migrations:
455        //  * If any `Replacement` was requested, use `Replacement`.
456        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
457        let mut by_object = BTreeMap::new();
458        for step in steps {
459            if let Some(entry) = by_object.get_mut(&step.object) {
460                *entry = match (step.mechanism, *entry) {
461                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
462                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
463                        Mechanism::Replacement
464                    }
465                };
466            } else {
467                by_object.insert(step.object.clone(), step.mechanism);
468            }
469        }
470
471        let mut plan = Plan::default();
472        for (object, mechanism) in by_object {
473            match mechanism {
474                Mechanism::Evolution => plan.evolve.push(object.into()),
475                Mechanism::Replacement => plan.replace.push(object.into()),
476            }
477        }
478
479        plan
480    }
481
482    /// Plan a forced migration of all objects using the given mechanism.
483    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
484        let objects = self
485            .system_objects
486            .iter()
487            .filter(|(_, info)| {
488                use Builtin::*;
489                match info.builtin {
490                    Table(..) | ContinualTask(..) => true,
491                    Source(source) => **source != *MZ_CATALOG_RAW,
492                    Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
493                }
494            })
495            .map(|(object, _)| object.clone())
496            .collect();
497
498        let mut plan = Plan::default();
499        match mechanism {
500            Mechanism::Evolution => plan.evolve = objects,
501            Mechanism::Replacement => plan.replace = objects,
502        }
503
504        plan
505    }
506
507    /// Upgrade the migration shard to the target version.
508    async fn upgrade_migration_shard_version(&self) {
509        let persist = &self.config.persist_client;
510        let diagnostics = Diagnostics {
511            shard_name: "builtin_migration".to_string(),
512            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
513        };
514
515        persist
516            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
517                self.migration_shard,
518                diagnostics,
519            )
520            .await
521            .expect("valid usage");
522    }
523
524    /// Migrate the given objects using the `Evolution` mechanism.
525    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
526        for object in objects {
527            self.migrate_evolve_one(object).await?;
528        }
529        Ok(())
530    }
531
532    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
533        let persist = &self.config.persist_client;
534
535        let Some(object_info) = self.system_objects.get(object) else {
536            bail!("missing builtin {object:?}");
537        };
538        let id = object_info.global_id;
539
540        let Some(shard_id) = object_info.shard_id else {
541            // No shard is registered for this builtin. In leader mode, this is fine, we'll
542            // register the shard during bootstrap. In read-only mode, we might be racing with the
543            // leader to register the shard and it's unclear what sort of confusion can arise from
544            // that -- better to bail out in this case.
545            if self.config.read_only {
546                bail!("missing shard ID for builtin {object:?} ({id})");
547            } else {
548                return Ok(());
549            }
550        };
551
552        let target_desc = match object_info.builtin {
553            Builtin::Table(table) => &table.desc,
554            Builtin::Source(source) => &source.desc,
555            Builtin::ContinualTask(ct) => &ct.desc,
556            _ => bail!("not a storage collection: {object:?}"),
557        };
558
559        let diagnostics = Diagnostics {
560            shard_name: id.to_string(),
561            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
562        };
563        let source_schema = persist
564            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
565            .await
566            .expect("valid usage");
567
568        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
569
570        if self.config.read_only {
571            // In read-only mode, only check that the new schema is backward compatible.
572            // We'll register it when/if we restart in leader mode.
573            if let Some((_, source_desc, _)) = &source_schema {
574                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
575                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
576                if backward_compatible(&old, &new).is_none() {
577                    bail!(
578                        "incompatible schema evolution for {object:?}: \
579                         {source_desc:?} -> {target_desc:?}"
580                    );
581                }
582            }
583
584            return Ok(());
585        }
586
587        let (mut schema_id, mut source_desc) = match source_schema {
588            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
589            None => {
590                // If no schema was previously registered, simply try to register the new one. This
591                // might fail due to a concurrent registration, in which case we'll fall back to
592                // `compare_and_evolve_schema`.
593
594                debug!(%id, %shard_id, "no previous schema found; registering initial one");
595                let schema_id = persist
596                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
597                        shard_id,
598                        target_desc,
599                        &UnitSchema,
600                        diagnostics.clone(),
601                    )
602                    .await
603                    .expect("valid usage");
604                if schema_id.is_some() {
605                    return Ok(());
606                }
607
608                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
609                let (schema_id, source_desc, _) = persist
610                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
611                        shard_id,
612                        diagnostics.clone(),
613                    )
614                    .await
615                    .expect("valid usage")
616                    .expect("known to exist");
617
618                (schema_id, source_desc)
619            }
620        };
621
622        loop {
623            // Evolving the schema might fail if another process evolved the schema concurrently,
624            // in which case we need to retry. Most likely the other process evolved the schema to
625            // our own target schema and the second try is a no-op.
626
627            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
628            let result = persist
629                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
630                    shard_id,
631                    schema_id,
632                    target_desc,
633                    &UnitSchema,
634                    diagnostics.clone(),
635                )
636                .await
637                .expect("valid usage");
638
639            match result {
640                CaESchema::Ok(schema_id) => {
641                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
642                    break;
643                }
644                CaESchema::Incompatible => bail!(
645                    "incompatible schema evolution for {object:?}: \
646                     {source_desc:?} -> {target_desc:?}"
647                ),
648                CaESchema::ExpectedMismatch {
649                    schema_id: new_id,
650                    key,
651                    val: UnitSchema,
652                } => {
653                    schema_id = new_id;
654                    source_desc = key;
655                }
656            }
657        }
658
659        Ok(())
660    }
661
662    /// Migrate the given objects using the `Replacement` mechanism.
663    async fn migrate_replace(
664        &self,
665        objects: &[SystemObjectDescription],
666    ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
667        if objects.is_empty() {
668            return Ok(Default::default());
669        }
670
671        let diagnostics = Diagnostics {
672            shard_name: "builtin_migration".to_string(),
673            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
674        };
675        let (mut persist_write, mut persist_read) =
676            self.open_migration_shard(diagnostics.clone()).await;
677
678        let mut ids_to_replace = BTreeSet::new();
679        for object in objects {
680            if let Some(info) = self.system_objects.get(object) {
681                ids_to_replace.insert(info.global_id);
682            } else {
683                bail!("missing id for builtin {object:?}");
684            }
685        }
686
687        info!(?objects, ?ids_to_replace, "migrating by replacement");
688
689        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
690        // This can fail due to writes by concurrent processes, so we need to retry.
691        let replaced_shards = loop {
692            if let Some(shards) = self
693                .try_get_or_insert_replacement_shards(
694                    &ids_to_replace,
695                    &mut persist_write,
696                    &mut persist_read,
697                )
698                .await?
699            {
700                break shards;
701            }
702        };
703
704        Ok(replaced_shards)
705    }
706
707    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
708    /// `target_version` and `deploy_generation`.
709    ///
710    /// This method looks for existing entries in the migration shards and returns those if they
711    /// are present. Otherwise it generates new shard IDs and tries to insert them.
712    ///
713    /// The result of this call is `None` if no existing entries were found and inserting new ones
714    /// failed because of a concurrent write to the migration shard. In this case, the caller is
715    /// expected to retry.
716    async fn try_get_or_insert_replacement_shards(
717        &self,
718        ids_to_replace: &BTreeSet<GlobalId>,
719        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
720        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
721    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
722        let upper = persist_write.fetch_recent_upper().await;
723        let write_ts = *upper.as_option().expect("migration shard not sealed");
724
725        let mut ids_to_replace = ids_to_replace.clone();
726        let mut replaced_shards = BTreeMap::new();
727
728        // Another process might already have done a shard replacement at our version and
729        // generation, in which case we can directly reuse the replacement shards.
730        //
731        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
732        // do. The set of migrations to run depends on both the source and the target version, and
733        // the migration shard is not keyed by source version. The previous writer might have seen
734        // a different source version, if there was a concurrent migration by a leader process.
735        if let Some(read_ts) = write_ts.step_back() {
736            let pred = |key: &migration_shard::Key| {
737                key.build_version == self.target_version
738                    && key.deploy_generation == Some(self.deploy_generation)
739            };
740            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
741                for (key, shard_id) in entries {
742                    let id = GlobalId::System(key.global_id);
743                    if ids_to_replace.remove(&id) {
744                        replaced_shards.insert(id, shard_id);
745                    }
746                }
747
748                debug!(
749                    %read_ts, ?replaced_shards, ?ids_to_replace,
750                    "found existing entries in migration shard",
751                );
752            }
753
754            if ids_to_replace.is_empty() {
755                return Ok(Some(replaced_shards));
756            }
757        }
758
759        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
760        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
761        // and we need to re-check the migration shard contents.
762        let mut updates = Vec::new();
763        for id in ids_to_replace {
764            let shard_id = ShardId::new();
765            replaced_shards.insert(id, shard_id);
766
767            let GlobalId::System(global_id) = id else {
768                bail!("attempt to migrate a non-system collection: {id}");
769            };
770            let key = migration_shard::Key {
771                global_id,
772                build_version: self.target_version.clone(),
773                deploy_generation: Some(self.deploy_generation),
774            };
775            updates.push(((key, shard_id), write_ts, 1));
776        }
777
778        let upper = Antichain::from_elem(write_ts);
779        let new_upper = Antichain::from_elem(write_ts.step_forward());
780        debug!(%write_ts, "attempting insert into migration shard");
781        let result = persist_write
782            .compare_and_append(updates, upper, new_upper)
783            .await
784            .expect("valid usage");
785
786        match result {
787            Ok(()) => {
788                debug!(
789                    %write_ts, ?replaced_shards,
790                    "successfully inserted into migration shard"
791                );
792                Ok(Some(replaced_shards))
793            }
794            Err(_mismatch) => Ok(None),
795        }
796    }
797
798    /// Open writer and reader for the migration shard.
799    async fn open_migration_shard(
800        &self,
801        diagnostics: Diagnostics,
802    ) -> (
803        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
804        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
805    ) {
806        let persist = &self.config.persist_client;
807
808        persist
809            .open(
810                self.migration_shard,
811                Arc::new(migration_shard::KeySchema),
812                Arc::new(ShardIdSchema),
813                diagnostics,
814                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
815            )
816            .await
817            .expect("valid usage")
818    }
819
820    /// Open a [`SinceHandle`] for the migration shard.
821    async fn open_migration_shard_since(
822        &self,
823        diagnostics: Diagnostics,
824    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
825        self.config
826            .persist_client
827            .open_critical_since(
828                self.migration_shard,
829                // TODO: We may need to use a different critical reader
830                // id for this if we want to be able to introspect it via SQL.
831                PersistClient::CONTROLLER_CRITICAL_SINCE,
832                Opaque::encode(&i64::MIN),
833                diagnostics.clone(),
834            )
835            .await
836            .expect("valid usage")
837    }
838
839    /// Update the fingerprints for `migrated_items`.
840    ///
841    /// Returns the new fingerprints. Also asserts that the current fingerprints of all other
842    /// system items match their builtin definitions.
843    fn update_fingerprints(
844        &self,
845        migrated_items: &BTreeSet<SystemObjectDescription>,
846    ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
847        let mut new_fingerprints = BTreeMap::new();
848        for (object, object_info) in &self.system_objects {
849            let id = object_info.global_id;
850            let builtin = object_info.builtin;
851
852            let fingerprint = builtin.fingerprint();
853            if fingerprint == object_info.fingerprint {
854                continue; // fingerprint unchanged, nothing to do
855            }
856
857            // Fingerprint mismatch is expected for a migrated item.
858            let migrated = migrated_items.contains(object);
859            // Some builtin types have schemas but no durable state. No migration needed for those.
860            let ephemeral = matches!(
861                builtin,
862                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
863            );
864
865            if migrated || ephemeral {
866                new_fingerprints.insert(object.clone(), fingerprint);
867            } else if builtin.runtime_alterable() {
868                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
869                // sentinel value stored in the catalog.
870                assert_eq!(
871                    object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
872                    "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
873                );
874            } else {
875                panic!(
876                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
877                    fingerprint, object_info.fingerprint,
878                );
879            }
880        }
881
882        Ok(new_fingerprints)
883    }
884
885    /// Perform cleanup of migration state, i.e. the migration shard.
886    ///
887    /// Returns a list of shards to finalize, and a `Future` that must be run after the shard
888    /// finalization has been durably enqueued. The `Future` is used to remove entries from the
889    /// migration shard only after we know the respective shards will be finalized. Removing
890    /// entries immediately would risk leaking the shards.
891    ///
892    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
893    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
894    /// it is safe to assume that any state for versions below the `target_version` is not needed
895    /// anymore and can be cleaned up.
896    ///
897    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
898    /// shard should always be pretty small, so keeping migration state around for longer isn't a
899    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
900    /// to transient failures, instead of retrying.
901    async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
902        let noop_action = async {}.boxed();
903        let noop_result = (BTreeSet::new(), noop_action);
904
905        if self.config.read_only {
906            return Ok(noop_result);
907        }
908
909        let diagnostics = Diagnostics {
910            shard_name: "builtin_migration".to_string(),
911            handle_purpose: "builtin schema migration cleanup".into(),
912        };
913        let (mut persist_write, mut persist_read) =
914            self.open_migration_shard(diagnostics.clone()).await;
915        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
916
917        let upper = persist_write.fetch_recent_upper().await.clone();
918        let write_ts = *upper.as_option().expect("migration shard not sealed");
919        let Some(read_ts) = write_ts.step_back() else {
920            return Ok(noop_result);
921        };
922
923        // Collect old entries to remove.
924        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
925        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
926        else {
927            return Ok(noop_result);
928        };
929
930        debug!(
931            ?stale_entries,
932            "cleaning migration shard up to version {}", self.target_version,
933        );
934
935        let current_shards: BTreeMap<_, _> = self
936            .system_objects
937            .values()
938            .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
939            .collect();
940
941        let mut shards_to_finalize = BTreeSet::new();
942        let mut retractions = Vec::new();
943        for (key, shard_id) in stale_entries {
944            // The migration shard contains both shards created during aborted upgrades and shards
945            // created during successful upgrades. The latter may still be in use, so we have to
946            // check and only finalize those that aren't anymore.
947            let gid = GlobalId::System(key.global_id);
948            if current_shards.get(&gid) != Some(&shard_id) {
949                shards_to_finalize.insert(shard_id);
950            }
951
952            retractions.push(((key, shard_id), write_ts, -1));
953        }
954
955        let cleanup_action = async move {
956            if !retractions.is_empty() {
957                let new_upper = Antichain::from_elem(write_ts.step_forward());
958                let result = persist_write
959                    .compare_and_append(retractions, upper, new_upper)
960                    .await
961                    .expect("valid usage");
962                match result {
963                    Ok(()) => debug!("cleaned up migration shard"),
964                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
965                }
966            }
967        }
968        .boxed();
969
970        // Downgrade the since, to enable some compaction.
971        let o = persist_since.opaque().clone();
972        let new_since = Antichain::from_elem(read_ts);
973        let result = persist_since
974            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
975            .await;
976        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
977
978        Ok((shards_to_finalize, cleanup_action))
979    }
980}
981
982/// Read the migration shard at the given timestamp, returning all entries that match the given
983/// predicate.
984///
985/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
986/// `read_ts`.
987async fn read_migration_shard<P>(
988    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
989    read_ts: Timestamp,
990    predicate: P,
991) -> Option<Vec<(migration_shard::Key, ShardId)>>
992where
993    P: for<'a> Fn(&migration_shard::Key) -> bool,
994{
995    let as_of = Antichain::from_elem(read_ts);
996    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
997
998    assert!(
999        updates.iter().all(|(_, _, diff)| *diff == 1),
1000        "migration shard contains invalid diffs: {updates:?}",
1001    );
1002
1003    let entries: Vec<_> = updates
1004        .into_iter()
1005        .map(|(data, _, _)| data)
1006        .filter(move |(key, _)| predicate(key))
1007        .collect();
1008
1009    (!entries.is_empty()).then_some(entries)
1010}
1011
1012/// A plan to migrate between two versions.
1013#[derive(Debug, Default)]
1014struct Plan {
1015    /// Objects to migrate using the `Evolution` mechanism.
1016    evolve: Vec<SystemObjectDescription>,
1017    /// Objects to migrate using the `Replacement` mechanism.
1018    replace: Vec<SystemObjectDescription>,
1019}
1020
1021/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
1022mod migration_shard {
1023    use std::fmt;
1024    use std::str::FromStr;
1025
1026    use arrow::array::{StringArray, StringBuilder};
1027    use bytes::{BufMut, Bytes};
1028    use mz_persist_types::Codec;
1029    use mz_persist_types::codec_impls::{
1030        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1031    };
1032    use mz_persist_types::columnar::Schema;
1033    use mz_persist_types::stats::NoneStats;
1034    use semver::Version;
1035    use serde::{Deserialize, Serialize};
1036
1037    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1038    pub(super) struct Key {
1039        pub(super) global_id: u64,
1040        pub(super) build_version: Version,
1041        // Versions < 26.0 didn't include the deploy generation. As long as we still might
1042        // encounter migration shard entries that don't have it, we need to keep this an `Option`
1043        // and keep supporting both key formats.
1044        pub(super) deploy_generation: Option<u64>,
1045    }
1046
1047    impl fmt::Display for Key {
1048        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1049            if self.deploy_generation.is_some() {
1050                // current format
1051                let s = serde_json::to_string(self).expect("JSON serializable");
1052                f.write_str(&s)
1053            } else {
1054                // pre-26.0 format
1055                write!(f, "{}-{}", self.global_id, self.build_version)
1056            }
1057        }
1058    }
1059
1060    impl FromStr for Key {
1061        type Err = String;
1062
1063        fn from_str(s: &str) -> Result<Self, String> {
1064            // current format
1065            if let Ok(key) = serde_json::from_str(s) {
1066                return Ok(key);
1067            };
1068
1069            // pre-26.0 format
1070            let parts: Vec<_> = s.splitn(2, '-').collect();
1071            let &[global_id, build_version] = parts.as_slice() else {
1072                return Err(format!("invalid Key '{s}'"));
1073            };
1074            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1075            let build_version = build_version
1076                .parse::<Version>()
1077                .map_err(|e| e.to_string())?;
1078            Ok(Key {
1079                global_id,
1080                build_version,
1081                deploy_generation: None,
1082            })
1083        }
1084    }
1085
1086    impl Default for Key {
1087        fn default() -> Self {
1088            Self {
1089                global_id: Default::default(),
1090                build_version: Version::new(0, 0, 0),
1091                deploy_generation: Some(0),
1092            }
1093        }
1094    }
1095
1096    impl Codec for Key {
1097        type Schema = KeySchema;
1098        type Storage = ();
1099
1100        fn codec_name() -> String {
1101            "TableKey".into()
1102        }
1103
1104        fn encode<B: BufMut>(&self, buf: &mut B) {
1105            buf.put(self.to_string().as_bytes())
1106        }
1107
1108        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1109            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1110            s.parse()
1111        }
1112
1113        fn encode_schema(_schema: &KeySchema) -> Bytes {
1114            Bytes::new()
1115        }
1116
1117        fn decode_schema(buf: &Bytes) -> Self::Schema {
1118            assert_eq!(*buf, Bytes::new());
1119            KeySchema
1120        }
1121    }
1122
1123    impl SimpleColumnarData for Key {
1124        type ArrowBuilder = StringBuilder;
1125        type ArrowColumn = StringArray;
1126
1127        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1128            builder.values_slice().len()
1129        }
1130
1131        fn push(&self, builder: &mut Self::ArrowBuilder) {
1132            builder.append_value(&self.to_string());
1133        }
1134
1135        fn push_null(builder: &mut Self::ArrowBuilder) {
1136            builder.append_null();
1137        }
1138
1139        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1140            *self = column.value(idx).parse().expect("valid Key");
1141        }
1142    }
1143
1144    #[derive(Debug, PartialEq)]
1145    pub(super) struct KeySchema;
1146
1147    impl Schema<Key> for KeySchema {
1148        type ArrowColumn = StringArray;
1149        type Statistics = NoneStats;
1150        type Decoder = SimpleColumnarDecoder<Key>;
1151        type Encoder = SimpleColumnarEncoder<Key>;
1152
1153        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1154            Ok(SimpleColumnarEncoder::default())
1155        }
1156
1157        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1158            Ok(SimpleColumnarDecoder::new(col))
1159        }
1160    }
1161}
1162
1163#[cfg(test)]
1164#[path = "builtin_schema_migration_tests.rs"]
1165mod tests;