mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::Arc;
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTINS_STATIC, Builtin, Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
40    RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
41};
42use mz_catalog::config::BuiltinItemMigrationConfig;
43use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
44use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
45use mz_catalog::memory::error::{Error, ErrorKind};
46use mz_ore::soft_assert_or_log;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
48use mz_persist_client::critical::SinceHandle;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_client::schema::CaESchema;
51use mz_persist_client::write::WriteHandle;
52use mz_persist_client::{Diagnostics, PersistClient};
53use mz_persist_types::ShardId;
54use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
55use mz_persist_types::schema::backward_compatible;
56use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
57use mz_repr::{CatalogItemId, GlobalId, Timestamp};
58use mz_sql::catalog::{CatalogItemType, NameReference};
59use mz_storage_client::controller::StorageTxn;
60use mz_storage_types::StorageDiff;
61use mz_storage_types::sources::SourceData;
62use semver::Version;
63use timely::progress::Antichain;
64use tracing::{debug, info};
65
66use crate::catalog::migrate::get_migration_version;
67
68/// Builtin schema migrations required to upgrade to the current build version.
69///
70/// Migration steps for old versions must be retained around according to the upgrade policy. For
71/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
72/// can delete all migration steps with versions before `(N-1).0.0`.
73///
74/// Smallest supported version: 0.147.0
75const MIGRATIONS: &[MigrationStep] = &[
76    MigrationStep {
77        version: Version::new(0, 149, 0),
78        object: Object {
79            type_: CatalogItemType::Source,
80            schema: MZ_INTERNAL_SCHEMA,
81            name: "mz_sink_statistics_raw",
82        },
83        mechanism: Mechanism::Replacement,
84    },
85    MigrationStep {
86        version: Version::new(0, 149, 0),
87        object: Object {
88            type_: CatalogItemType::Source,
89            schema: MZ_INTERNAL_SCHEMA,
90            name: "mz_source_statistics_raw",
91        },
92        mechanism: Mechanism::Replacement,
93    },
94    MigrationStep {
95        version: Version::new(0, 159, 0),
96        object: Object {
97            type_: CatalogItemType::Source,
98            schema: MZ_INTERNAL_SCHEMA,
99            name: "mz_cluster_replica_metrics_history",
100        },
101        mechanism: Mechanism::Evolution,
102    },
103    MigrationStep {
104        version: Version::new(0, 160, 0),
105        object: Object {
106            type_: CatalogItemType::Table,
107            schema: MZ_CATALOG_SCHEMA,
108            name: "mz_roles",
109        },
110        mechanism: Mechanism::Replacement,
111    },
112    MigrationStep {
113        version: Version::new(0, 160, 0),
114        object: Object {
115            type_: CatalogItemType::Table,
116            schema: MZ_CATALOG_SCHEMA,
117            name: "mz_sinks",
118        },
119        mechanism: Mechanism::Replacement,
120    },
121];
122
123/// A migration required to upgrade past a specific version.
124#[derive(Clone, Debug)]
125struct MigrationStep {
126    /// The build version that requires this migration.
127    version: Version,
128    /// The object that requires migration.
129    object: Object,
130    /// The migration mechanism to be used.
131    mechanism: Mechanism,
132}
133
134/// The mechanism to use to migrate the schema of a builtin collection.
135#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
136#[allow(dead_code)]
137enum Mechanism {
138    /// Persist schema evolution.
139    ///
140    /// Keeps existing contents but only works for schema changes that are backward compatible
141    /// according to [`backward_compatible`].
142    Evolution,
143    /// Shard replacement.
144    ///
145    /// Works for arbitrary schema changes but loses existing contents.
146    Replacement,
147}
148
149/// The object of a migration.
150///
151/// This has the same information as [`SystemObjectDescription`] but can be constructed in `const`
152/// contexts, like the `MIGRATIONS` list.
153#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct Object {
155    type_: CatalogItemType,
156    schema: &'static str,
157    name: &'static str,
158}
159
160impl From<Object> for SystemObjectDescription {
161    fn from(object: Object) -> Self {
162        SystemObjectDescription {
163            schema_name: object.schema.into(),
164            object_type: object.type_,
165            object_name: object.name.into(),
166        }
167    }
168}
169
170/// The result of a builtin schema migration.
171pub(super) struct MigrationResult {
172    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
173    pub replaced_items: BTreeSet<CatalogItemId>,
174    /// A cleanup action to take once the migration has been made durable.
175    pub cleanup_action: BoxFuture<'static, ()>,
176}
177
178impl Default for MigrationResult {
179    fn default() -> Self {
180        Self {
181            replaced_items: Default::default(),
182            cleanup_action: async {}.boxed(),
183        }
184    }
185}
186
187/// Run builtin schema migrations.
188///
189/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
190/// `BUILTINS_STATIC` and `MIGRATIONS` lists to initialize the lists of available builtins and
191/// required migrations, respectively.
192pub(super) async fn run(
193    build_info: &BuildInfo,
194    deploy_generation: u64,
195    txn: &mut Transaction<'_>,
196    config: BuiltinItemMigrationConfig,
197) -> Result<MigrationResult, Error> {
198    // Sanity check to ensure we're not touching durable state in read-only mode.
199    assert_eq!(config.read_only, txn.is_savepoint());
200
201    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
202    // migrations if we observe this build info.
203    if *build_info == DUMMY_BUILD_INFO {
204        return Ok(MigrationResult::default());
205    }
206
207    let Some(durable_version) = get_migration_version(txn) else {
208        // New catalog; nothing to do.
209        return Ok(MigrationResult::default());
210    };
211    let build_version = build_info.semver_version();
212
213    let builtins = BUILTINS_STATIC
214        .iter()
215        .map(|builtin| {
216            let object = SystemObjectDescription {
217                schema_name: builtin.schema().to_string(),
218                object_type: builtin.catalog_item_type(),
219                object_name: builtin.name().to_string(),
220            };
221            (object, builtin)
222        })
223        .collect();
224
225    let migration = Migration::new(
226        durable_version.clone(),
227        build_version.clone(),
228        deploy_generation,
229        txn,
230        builtins,
231        config,
232    );
233
234    let result = migration.run(MIGRATIONS).await.map_err(|e| {
235        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
236            last_seen_version: durable_version.to_string(),
237            this_version: build_version.to_string(),
238            cause: e.to_string(),
239        })
240    })?;
241
242    Ok(result)
243}
244
245/// Context of a builtin schema migration.
246struct Migration<'a, 'b> {
247    /// The version we are migrating from.
248    ///
249    /// Same as the build version of the most recent leader process that successfully performed
250    /// migrations.
251    source_version: Version,
252    /// The version we are migration to.
253    ///
254    /// Same as the build version of this process.
255    target_version: Version,
256    deploy_generation: u64,
257    txn: &'a mut Transaction<'b>,
258    builtins: BTreeMap<SystemObjectDescription, &'static Builtin<NameReference>>,
259    object_ids: BTreeMap<SystemObjectDescription, SystemObjectUniqueIdentifier>,
260    config: BuiltinItemMigrationConfig,
261}
262
263impl<'a, 'b> Migration<'a, 'b> {
264    fn new(
265        source_version: Version,
266        target_version: Version,
267        deploy_generation: u64,
268        txn: &'a mut Transaction<'b>,
269        builtins: BTreeMap<SystemObjectDescription, &'static Builtin<NameReference>>,
270        config: BuiltinItemMigrationConfig,
271    ) -> Self {
272        let object_ids = txn
273            .get_system_object_mappings()
274            .map(|m| (m.description, m.unique_identifier))
275            .collect();
276
277        Self {
278            source_version,
279            target_version,
280            deploy_generation,
281            txn,
282            builtins,
283            object_ids,
284            config,
285        }
286    }
287
288    async fn run(mut self, steps: &[MigrationStep]) -> anyhow::Result<MigrationResult> {
289        info!(
290            deploy_generation = %self.deploy_generation,
291            "running builtin schema migration: {} -> {}",
292            self.source_version, self.target_version
293        );
294
295        self.validate_migration_steps(steps);
296
297        let (force, plan) = match self.config.force_migration.as_deref() {
298            None => (false, self.plan_migration(steps)),
299            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
300            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
301            Some(other) => panic!("unknown force migration mechanism: {other}"),
302        };
303
304        if self.source_version == self.target_version && !force {
305            info!("skipping migration: already at target version");
306            return Ok(Default::default());
307        } else if self.source_version > self.target_version {
308            bail!("downgrade not supported");
309        }
310
311        // In leader mode, upgrade the version of the migration shard to the target version.
312        // This fences out any readers at lower versions.
313        if !self.config.read_only {
314            self.upgrade_migration_shard_version().await;
315        }
316
317        info!("executing migration plan: {plan:?}");
318
319        self.migrate_evolve(&plan.evolve).await?;
320        self.migrate_replace(&plan.replace).await?;
321
322        let mut migrated_items = BTreeSet::new();
323        let mut replaced_items = BTreeSet::new();
324        for object in &plan.evolve {
325            let id = self.object_ids[object].catalog_id;
326            migrated_items.insert(id);
327        }
328        for object in &plan.replace {
329            let id = self.object_ids[object].catalog_id;
330            migrated_items.insert(id);
331            replaced_items.insert(id);
332        }
333
334        self.update_fingerprints(&migrated_items)?;
335
336        let cleanup_action = self.cleanup().await?;
337
338        Ok(MigrationResult {
339            replaced_items,
340            cleanup_action,
341        })
342    }
343
344    /// Sanity check the given migration steps.
345    ///
346    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
347    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
348        for step in steps {
349            assert!(
350                step.version <= self.target_version,
351                "migration step version greater than target version: {} > {}",
352                step.version,
353                self.target_version,
354            );
355
356            let object = SystemObjectDescription::from(step.object.clone());
357
358            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
359            // cause the table to be truncated because the contents are not also stored in the durable
360            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
361            // on startup. The correctness of that pruning relies on there being no other retractions
362            // to `mz_storage_usage_by_shard`.
363            //
364            // TODO: Confirm the above reasoning, it might be outdated?
365            assert_ne!(
366                *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
367                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
368            );
369
370            let Some(builtin) = self.builtins.get(&object) else {
371                panic!("migration step for non-existent builtin: {object:?}");
372            };
373
374            use Builtin::*;
375            assert!(
376                matches!(builtin, Table(..) | Source(..) | ContinualTask(..)),
377                "schema migration not supported for builtin: {builtin:?}",
378            );
379        }
380    }
381
382    /// Select for each object to migrate the appropriate migration mechanism.
383    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
384        // Ignore any steps at versions before `source_version`.
385        let steps = steps.iter().filter(|s| s.version > self.source_version);
386
387        // Select a mechanism for each object, according to the requested migrations:
388        //  * If any `Replacement` was requested, use `Replacement`.
389        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
390        let mut by_object = BTreeMap::new();
391        for step in steps {
392            if let Some(entry) = by_object.get_mut(&step.object) {
393                *entry = match (step.mechanism, *entry) {
394                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
395                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
396                        Mechanism::Replacement
397                    }
398                };
399            } else {
400                by_object.insert(step.object.clone(), step.mechanism);
401            }
402        }
403
404        let mut plan = Plan::default();
405        for (object, mechanism) in by_object {
406            match mechanism {
407                Mechanism::Evolution => plan.evolve.push(object.into()),
408                Mechanism::Replacement => plan.replace.push(object.into()),
409            }
410        }
411
412        plan
413    }
414
415    /// Plan a forced migration of all objects using the given mechanism.
416    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
417        let objects = self
418            .builtins
419            .iter()
420            .filter(|(_, builtin)| matches!(builtin, Builtin::Table(..) | Builtin::Source(..)))
421            .map(|(object, _)| object.clone())
422            .collect();
423
424        let mut plan = Plan::default();
425        match mechanism {
426            Mechanism::Evolution => plan.evolve = objects,
427            Mechanism::Replacement => plan.replace = objects,
428        }
429
430        plan
431    }
432
433    /// Upgrade the migration shard to the target version.
434    async fn upgrade_migration_shard_version(&self) {
435        let persist = &self.config.persist_client;
436        let shard_id = self.txn.get_builtin_migration_shard().expect("must exist");
437        let diagnostics = Diagnostics {
438            shard_name: "builtin_migration".to_string(),
439            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
440        };
441
442        persist
443            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
444                shard_id,
445                diagnostics,
446            )
447            .await
448            .expect("valid usage");
449    }
450
451    /// Migrate the given objects using the `Evolution` mechanism.
452    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
453        for object in objects {
454            self.migrate_evolve_one(object).await?;
455        }
456        Ok(())
457    }
458
459    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
460        let collection_metadata = self.txn.get_collection_metadata();
461        let persist = &self.config.persist_client;
462
463        let Some(builtin) = self.builtins.get(object) else {
464            bail!("missing builtin {object:?}");
465        };
466        let Some(id) = self.object_ids.get(object).map(|i| i.global_id) else {
467            bail!("missing id for builtin {object:?}");
468        };
469        let Some(&shard_id) = collection_metadata.get(&id) else {
470            // No shard is registered for this builtin. In leader mode, this is fine, we'll
471            // register the shard during bootstrap. In read-only mode, we might be racing with the
472            // leader to register the shard and it's unclear what sort of confusion can arise from
473            // that -- better to bail out in this case.
474            if self.config.read_only {
475                bail!("missing collection metadata for builtin {object:?} ({id})");
476            } else {
477                return Ok(());
478            }
479        };
480
481        let target_desc = match builtin {
482            Builtin::Table(table) => &table.desc,
483            Builtin::Source(source) => &source.desc,
484            Builtin::ContinualTask(ct) => &ct.desc,
485            _ => bail!("not a storage collection: {builtin:?}"),
486        };
487
488        let diagnostics = Diagnostics {
489            shard_name: id.to_string(),
490            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
491        };
492        let source_schema = persist
493            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
494            .await
495            .expect("valid usage");
496
497        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
498
499        if self.config.read_only {
500            // In read-only mode, only check that the new schema is backward compatible.
501            // We'll register it when/if we restart in leader mode.
502            if let Some((_, source_desc, _)) = &source_schema {
503                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
504                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
505                if backward_compatible(&old, &new).is_none() {
506                    bail!(
507                        "incompatible schema evolution for {object:?}: \
508                         {source_desc:?} -> {target_desc:?}"
509                    );
510                }
511            }
512
513            return Ok(());
514        }
515
516        let (mut schema_id, mut source_desc) = match source_schema {
517            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
518            None => {
519                // If no schema was previously registered, simply try to register the new one. This
520                // might fail due to a concurrent registration, in which case we'll fall back to
521                // `compare_and_evolve_schema`.
522
523                debug!(%id, %shard_id, "no previous schema found; registering initial one");
524                let schema_id = persist
525                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
526                        shard_id,
527                        target_desc,
528                        &UnitSchema,
529                        diagnostics.clone(),
530                    )
531                    .await
532                    .expect("valid usage");
533                if schema_id.is_some() {
534                    return Ok(());
535                }
536
537                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
538                let (schema_id, source_desc, _) = persist
539                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
540                        shard_id,
541                        diagnostics.clone(),
542                    )
543                    .await
544                    .expect("valid usage")
545                    .expect("known to exist");
546
547                (schema_id, source_desc)
548            }
549        };
550
551        loop {
552            // Evolving the schema might fail if another process evolved the schema concurrently,
553            // in which case we need to retry. Most likely the other process evolved the schema to
554            // our own target schema and the second try is a no-op.
555
556            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
557            let result = persist
558                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
559                    shard_id,
560                    schema_id,
561                    target_desc,
562                    &UnitSchema,
563                    diagnostics.clone(),
564                )
565                .await
566                .expect("valid usage");
567
568            match result {
569                CaESchema::Ok(schema_id) => {
570                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
571                    break;
572                }
573                CaESchema::Incompatible => bail!(
574                    "incompatible schema evolution for {object:?}: \
575                     {source_desc:?} -> {target_desc:?}"
576                ),
577                CaESchema::ExpectedMismatch {
578                    schema_id: new_id,
579                    key,
580                    val: UnitSchema,
581                } => {
582                    schema_id = new_id;
583                    source_desc = key;
584                }
585            }
586        }
587
588        Ok(())
589    }
590
591    /// Migrate the given objects using the `Replacement` mechanism.
592    async fn migrate_replace(&mut self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
593        if objects.is_empty() {
594            return Ok(Default::default());
595        }
596
597        let diagnostics = Diagnostics {
598            shard_name: "builtin_migration".to_string(),
599            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
600        };
601        let (mut persist_write, mut persist_read) =
602            self.open_migration_shard(diagnostics.clone()).await;
603
604        let mut ids_to_replace = BTreeSet::new();
605        for object in objects {
606            if let Some(ids) = self.object_ids.get(object) {
607                ids_to_replace.insert(ids.global_id);
608            } else {
609                bail!("missing id for builtin {object:?}");
610            }
611        }
612
613        info!(?objects, ?ids_to_replace, "migrating by replacement");
614
615        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
616        // This can fail due to writes by concurrent processes, so we need to retry.
617        let replaced_shards = loop {
618            if let Some(shards) = self
619                .try_get_or_insert_replacement_shards(
620                    &ids_to_replace,
621                    &mut persist_write,
622                    &mut persist_read,
623                )
624                .await?
625            {
626                break shards;
627            }
628        };
629
630        // Update the collection metadata in the transaction and enqueue old shards for
631        // finalization.
632        let old = self.txn.delete_collection_metadata(ids_to_replace);
633        let old_shards = old.into_iter().map(|(_, shard_id)| shard_id).collect();
634        self.txn.insert_unfinalized_shards(old_shards)?;
635        self.txn.insert_collection_metadata(replaced_shards)?;
636
637        Ok(())
638    }
639
640    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
641    /// `target_version` and `deploy_generation`.
642    ///
643    /// This method looks for existing entries in the migration shards and returns those if they
644    /// are present. Otherwise it generates new shard IDs and tries to insert them.
645    ///
646    /// The result of this call is `None` if no existing entries were found and inserting new ones
647    /// failed because of a concurrent write to the migration shard. In this case, the caller is
648    /// expected to retry.
649    async fn try_get_or_insert_replacement_shards(
650        &self,
651        ids_to_replace: &BTreeSet<GlobalId>,
652        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
653        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
654    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
655        let upper = persist_write.fetch_recent_upper().await;
656        let write_ts = *upper.as_option().expect("migration shard not sealed");
657
658        let mut ids_to_replace = ids_to_replace.clone();
659        let mut replaced_shards = BTreeMap::new();
660
661        // Another process might already have done a shard replacement at our version and
662        // generation, in which case we can directly reuse the replacement shards.
663        //
664        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
665        // do. The set of migrations to run depends on both the source and the target version, and
666        // the migration shard is not keyed by source version. The previous writer might have seen
667        // a different source version, if there was a concurrent migration by a leader process.
668        if let Some(read_ts) = write_ts.step_back() {
669            let pred = |key: &migration_shard::Key| {
670                key.build_version == self.target_version
671                    && key.deploy_generation == Some(self.deploy_generation)
672            };
673            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
674                for (key, shard_id) in entries {
675                    let id = GlobalId::System(key.global_id);
676                    if ids_to_replace.remove(&id) {
677                        replaced_shards.insert(id, shard_id);
678                    }
679                }
680
681                debug!(
682                    %read_ts, ?replaced_shards, ?ids_to_replace,
683                    "found existing entries in migration shard",
684                );
685            }
686
687            if ids_to_replace.is_empty() {
688                return Ok(Some(replaced_shards));
689            }
690        }
691
692        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
693        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
694        // and we need to re-check the migration shard contents.
695        let mut updates = Vec::new();
696        for id in ids_to_replace {
697            let shard_id = ShardId::new();
698            replaced_shards.insert(id, shard_id);
699
700            let GlobalId::System(global_id) = id else {
701                bail!("attempt to migrate a non-system collection: {id}");
702            };
703            let key = migration_shard::Key {
704                global_id,
705                build_version: self.target_version.clone(),
706                deploy_generation: Some(self.deploy_generation),
707            };
708            updates.push(((key, shard_id), write_ts, 1));
709        }
710
711        let upper = Antichain::from_elem(write_ts);
712        let new_upper = Antichain::from_elem(write_ts.step_forward());
713        debug!(%write_ts, "attempting insert into migration shard");
714        let result = persist_write
715            .compare_and_append(updates, upper, new_upper)
716            .await
717            .expect("valid usage");
718
719        match result {
720            Ok(()) => {
721                debug!(
722                    %write_ts, ?replaced_shards,
723                    "successfully inserted into migration shard"
724                );
725                Ok(Some(replaced_shards))
726            }
727            Err(_mismatch) => Ok(None),
728        }
729    }
730
731    /// Open writer and reader for the migration shard.
732    async fn open_migration_shard(
733        &self,
734        diagnostics: Diagnostics,
735    ) -> (
736        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
737        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
738    ) {
739        let persist = &self.config.persist_client;
740        let shard_id = self.txn.get_builtin_migration_shard().expect("must exist");
741
742        persist
743            .open(
744                shard_id,
745                Arc::new(migration_shard::KeySchema),
746                Arc::new(ShardIdSchema),
747                diagnostics,
748                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
749            )
750            .await
751            .expect("valid usage")
752    }
753
754    /// Open a [`SinceHandle`] for the migration shard.
755    async fn open_migration_shard_since(
756        &self,
757        diagnostics: Diagnostics,
758    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff, i64> {
759        let persist = &self.config.persist_client;
760        let shard_id = self.txn.get_builtin_migration_shard().expect("must exist");
761
762        persist
763            .open_critical_since(
764                shard_id,
765                // TODO: We may need to use a different critical reader
766                // id for this if we want to be able to introspect it via SQL.
767                PersistClient::CONTROLLER_CRITICAL_SINCE,
768                diagnostics.clone(),
769            )
770            .await
771            .expect("valid usage")
772    }
773
774    /// Update the fingerprints stored for `migrated_items` in the catalog.
775    ///
776    /// Also asserts that the stored fingerprints of all other system items match their builtin
777    /// definitions.
778    fn update_fingerprints(
779        &mut self,
780        migrated_items: &BTreeSet<CatalogItemId>,
781    ) -> anyhow::Result<()> {
782        let mut updates = BTreeMap::new();
783        for (object, ids) in &self.object_ids {
784            let Some(builtin) = self.builtins.get(object) else {
785                bail!("missing builtin {object:?}");
786            };
787
788            let id = ids.catalog_id;
789            let fingerprint = builtin.fingerprint();
790            if fingerprint == ids.fingerprint {
791                continue; // fingerprint unchanged, nothing to do
792            }
793
794            // Fingerprint mismatch is expected for a migrated item.
795            let migrated = migrated_items.contains(&id);
796            // Some builtin types have schemas but no durable state. No migration needed for those.
797            let ephemeral = matches!(
798                builtin,
799                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
800            );
801
802            if migrated || ephemeral {
803                let new_mapping = SystemObjectMapping {
804                    description: object.clone(),
805                    unique_identifier: SystemObjectUniqueIdentifier {
806                        catalog_id: ids.catalog_id,
807                        global_id: ids.global_id,
808                        fingerprint,
809                    },
810                };
811                updates.insert(id, new_mapping);
812            } else if builtin.runtime_alterable() {
813                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
814                // sentinel value stored in the catalog.
815                assert_eq!(
816                    ids.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
817                    "fingerprint mismatch for runtime-alterable builtin {builtin:?} ({id})",
818                );
819            } else {
820                panic!(
821                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
822                    fingerprint, ids.fingerprint,
823                );
824            }
825        }
826
827        self.txn.update_system_object_mappings(updates)?;
828
829        Ok(())
830    }
831
832    /// Perform cleanup of migration state, i.e. the migration shard.
833    ///
834    /// Returns a `Future` that must be run after the `txn` has been successfully committed to
835    /// durable state. This is used to remove entries from the migration shard only after we know
836    /// the respective shards will be finalized. Removing entries immediately would risk leaking
837    /// the shards.
838    ///
839    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
840    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
841    /// it is safe to assume that any state for versions below the `target_version` is not needed
842    /// anymore and can be cleaned up.
843    ///
844    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
845    /// shard should always be pretty small, so keeping migration state around for longer isn't a
846    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
847    /// to transient failures, instead of retrying.
848    async fn cleanup(&mut self) -> anyhow::Result<BoxFuture<'static, ()>> {
849        let noop_action = async {}.boxed();
850
851        if self.config.read_only {
852            return Ok(noop_action);
853        }
854
855        let collection_metadata = self.txn.get_collection_metadata();
856        let diagnostics = Diagnostics {
857            shard_name: "builtin_migration".to_string(),
858            handle_purpose: "builtin schema migration cleanup".into(),
859        };
860        let (mut persist_write, mut persist_read) =
861            self.open_migration_shard(diagnostics.clone()).await;
862        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
863
864        let upper = persist_write.fetch_recent_upper().await.clone();
865        let write_ts = *upper.as_option().expect("migration shard not sealed");
866        let Some(read_ts) = write_ts.step_back() else {
867            return Ok(noop_action);
868        };
869
870        // Collect old entries to remove.
871        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
872        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
873        else {
874            return Ok(noop_action);
875        };
876
877        debug!(
878            ?stale_entries,
879            "cleaning migration shard up to version {}", self.target_version,
880        );
881
882        let mut unfinalized_shards = BTreeSet::new();
883        let mut retractions = Vec::new();
884        for (key, shard_id) in stale_entries {
885            // The migration shard contains both shards created during aborted upgrades and shards
886            // created during successful upgrades. The latter may still be in use, so we have to
887            // check and only finalize those that aren't anymore.
888            let gid = GlobalId::System(key.global_id);
889            if collection_metadata.get(&gid) != Some(&shard_id) {
890                unfinalized_shards.insert(shard_id);
891            }
892
893            retractions.push(((key, shard_id), write_ts, -1));
894        }
895
896        // Arrange for shard finalization and subsequent removal of the migration shard entries.
897        self.txn.insert_unfinalized_shards(unfinalized_shards)?;
898        let cleanup_action = async move {
899            if !retractions.is_empty() {
900                let new_upper = Antichain::from_elem(write_ts.step_forward());
901                let result = persist_write
902                    .compare_and_append(retractions, upper, new_upper)
903                    .await
904                    .expect("valid usage");
905                match result {
906                    Ok(()) => debug!("cleaned up migration shard"),
907                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
908                }
909            }
910        }
911        .boxed();
912
913        // Downgrade the since, to enable some compaction.
914        let o = *persist_since.opaque();
915        let new_since = Antichain::from_elem(read_ts);
916        let result = persist_since
917            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
918            .await;
919        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
920
921        Ok(cleanup_action)
922    }
923}
924
925/// Read the migration shard at the given timestamp, returning all entries that match the given
926/// predicate.
927///
928/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
929/// `read_ts`.
930async fn read_migration_shard<P>(
931    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
932    read_ts: Timestamp,
933    predicate: P,
934) -> Option<Vec<(migration_shard::Key, ShardId)>>
935where
936    P: for<'a> Fn(&migration_shard::Key) -> bool,
937{
938    let as_of = Antichain::from_elem(read_ts);
939    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
940
941    assert!(
942        updates.iter().all(|(_, _, diff)| *diff == 1),
943        "migration shard contains invalid diffs: {updates:?}",
944    );
945
946    let entries: Vec<_> = updates
947        .into_iter()
948        .filter_map(|(data, _, _)| {
949            if let (Ok(key), Ok(val)) = data {
950                Some((key, val))
951            } else {
952                // If we can't decode the data, it has likely been written by a newer version, so
953                // we ignore it.
954                info!("skipping unreadable migration shard entry: {data:?}");
955                None
956            }
957        })
958        .filter(move |(key, _)| predicate(key))
959        .collect();
960
961    (!entries.is_empty()).then_some(entries)
962}
963
964/// A plan to migrate between two versions.
965#[derive(Debug, Default)]
966struct Plan {
967    /// Objects to migrate using the `Evolution` mechanism.
968    evolve: Vec<SystemObjectDescription>,
969    /// Objects to migrate using the `Replacement` mechanism.
970    replace: Vec<SystemObjectDescription>,
971}
972
973/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
974mod migration_shard {
975    use std::fmt;
976    use std::str::FromStr;
977
978    use arrow::array::{StringArray, StringBuilder};
979    use bytes::{BufMut, Bytes};
980    use mz_persist_types::Codec;
981    use mz_persist_types::codec_impls::{
982        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
983    };
984    use mz_persist_types::columnar::Schema;
985    use mz_persist_types::stats::NoneStats;
986    use semver::Version;
987    use serde::{Deserialize, Serialize};
988
989    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
990    pub(super) struct Key {
991        pub(super) global_id: u64,
992        pub(super) build_version: Version,
993        // Versions < 26.0 didn't include the deploy generation. As long as we still might
994        // encounter migration shard entries that don't have it, we need to keep this an `Option`
995        // and keep supporting both key formats.
996        pub(super) deploy_generation: Option<u64>,
997    }
998
999    impl fmt::Display for Key {
1000        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1001            if self.deploy_generation.is_some() {
1002                // current format
1003                let s = serde_json::to_string(self).expect("JSON serializable");
1004                f.write_str(&s)
1005            } else {
1006                // pre-26.0 format
1007                write!(f, "{}-{}", self.global_id, self.build_version)
1008            }
1009        }
1010    }
1011
1012    impl FromStr for Key {
1013        type Err = String;
1014
1015        fn from_str(s: &str) -> Result<Self, String> {
1016            // current format
1017            if let Ok(key) = serde_json::from_str(s) {
1018                return Ok(key);
1019            };
1020
1021            // pre-26.0 format
1022            let parts: Vec<_> = s.splitn(2, '-').collect();
1023            let &[global_id, build_version] = parts.as_slice() else {
1024                return Err(format!("invalid Key '{s}'"));
1025            };
1026            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1027            let build_version = build_version
1028                .parse::<Version>()
1029                .map_err(|e| e.to_string())?;
1030            Ok(Key {
1031                global_id,
1032                build_version,
1033                deploy_generation: None,
1034            })
1035        }
1036    }
1037
1038    impl Default for Key {
1039        fn default() -> Self {
1040            Self {
1041                global_id: Default::default(),
1042                build_version: Version::new(0, 0, 0),
1043                deploy_generation: Some(0),
1044            }
1045        }
1046    }
1047
1048    impl Codec for Key {
1049        type Schema = KeySchema;
1050        type Storage = ();
1051
1052        fn codec_name() -> String {
1053            "TableKey".into()
1054        }
1055
1056        fn encode<B: BufMut>(&self, buf: &mut B) {
1057            buf.put(self.to_string().as_bytes())
1058        }
1059
1060        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1061            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1062            s.parse()
1063        }
1064
1065        fn encode_schema(_schema: &KeySchema) -> Bytes {
1066            Bytes::new()
1067        }
1068
1069        fn decode_schema(buf: &Bytes) -> Self::Schema {
1070            assert_eq!(*buf, Bytes::new());
1071            KeySchema
1072        }
1073    }
1074
1075    impl SimpleColumnarData for Key {
1076        type ArrowBuilder = StringBuilder;
1077        type ArrowColumn = StringArray;
1078
1079        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1080            builder.values_slice().len()
1081        }
1082
1083        fn push(&self, builder: &mut Self::ArrowBuilder) {
1084            builder.append_value(&self.to_string());
1085        }
1086
1087        fn push_null(builder: &mut Self::ArrowBuilder) {
1088            builder.append_null();
1089        }
1090
1091        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1092            *self = column.value(idx).parse().expect("valid Key");
1093        }
1094    }
1095
1096    #[derive(Debug, PartialEq)]
1097    pub(super) struct KeySchema;
1098
1099    impl Schema<Key> for KeySchema {
1100        type ArrowColumn = StringArray;
1101        type Statistics = NoneStats;
1102        type Decoder = SimpleColumnarDecoder<Key>;
1103        type Encoder = SimpleColumnarEncoder<Key>;
1104
1105        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1106            Ok(SimpleColumnarEncoder::default())
1107        }
1108
1109        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1110            Ok(SimpleColumnarDecoder::new(col))
1111        }
1112    }
1113}