Skip to main content

mz_adapter/catalog/open/
builtin_schema_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for migrating the schemas of builtin storage collections.
11//!
12//! If a version upgrade changes the schema of a builtin collection that's made durable in persist,
13//! that persist shard's schema must be migrated accordingly. The migration must happen in a way
14//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the
15//! collections with the new schema, without interfering with the leader environment's continued
16//! use of the old schema.
17//!
18//! Two migration mechanisms are provided:
19//!
20//!  * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist
21//!    shard's schema in-place. Only works for backward-compatible changes.
22//!  * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new
23//!    version. Works for all schema changes but discards existing data.
24//!
25//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the
26//! `Evolution` mechanism should be used when possible, as it avoids data loss.
27//!
28//! For more context and details on the implementation, see
29//! `doc/developer/design/20251015_builtin_schema_migration.md`.
30
31use std::collections::{BTreeMap, BTreeSet};
32use std::sync::{Arc, LazyLock};
33
34use anyhow::bail;
35use futures::FutureExt;
36use futures::future::BoxFuture;
37use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
38use mz_catalog::builtin::{
39    BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
40    MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD,
41    MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
42};
43use mz_catalog::config::BuiltinItemMigrationConfig;
44use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
45use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction};
46use mz_catalog::memory::error::{Error, ErrorKind};
47use mz_ore::soft_assert_or_log;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
49use mz_persist_client::critical::{Opaque, SinceHandle};
50use mz_persist_client::read::ReadHandle;
51use mz_persist_client::schema::CaESchema;
52use mz_persist_client::write::WriteHandle;
53use mz_persist_client::{Diagnostics, PersistClient};
54use mz_persist_types::ShardId;
55use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
56use mz_persist_types::schema::backward_compatible;
57use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA};
58use mz_repr::{CatalogItemId, GlobalId, Timestamp};
59use mz_sql::catalog::{CatalogItemType, NameReference};
60use mz_storage_client::controller::StorageTxn;
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use semver::Version;
64use timely::progress::Antichain;
65use tracing::{debug, info};
66
67use crate::catalog::migrate::get_migration_version;
68
69/// Builtin schema migrations required to upgrade to the current build version.
70///
71/// Migration steps for old versions must be retained around according to the upgrade policy. For
72/// example, if we support upgrading one major version at a time, the release of version `N.0.0`
73/// can delete all migration steps with versions before `(N-1).0.0`.
74///
75/// Exception: when a builtin's `SystemObjectDescription` changes — e.g. a builtin table is
76/// converted to a materialized view (see `migrate_builtin_tables_to_mvs`), or a builtin is
77/// renamed or removed — existing steps naming the old description must be removed regardless
78/// of version, because `validate_migration_steps` panics on steps that don't resolve to a
79/// current builtin. This is safe only if a `Replacement` step for the new description is added
80/// at the conversion version: every environment that needed the removed steps upgrades from an
81/// even older version, so the new replacement subsumes them.
82///
83/// Smallest supported version: 0.147.0
84static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
85    vec![
86        MigrationStep::replacement(
87            "0.149.0",
88            CatalogItemType::Source,
89            MZ_INTERNAL_SCHEMA,
90            "mz_sink_statistics_raw",
91        ),
92        MigrationStep::replacement(
93            "0.149.0",
94            CatalogItemType::Source,
95            MZ_INTERNAL_SCHEMA,
96            "mz_source_statistics_raw",
97        ),
98        MigrationStep::evolution(
99            "0.159.0",
100            CatalogItemType::Source,
101            MZ_INTERNAL_SCHEMA,
102            "mz_cluster_replica_metrics_history",
103        ),
104        MigrationStep::replacement(
105            "0.160.0",
106            CatalogItemType::Table,
107            MZ_CATALOG_SCHEMA,
108            "mz_sinks",
109        ),
110        MigrationStep::replacement(
111            "26.18.0-dev.0",
112            CatalogItemType::MaterializedView,
113            MZ_CATALOG_SCHEMA,
114            "mz_databases",
115        ),
116        MigrationStep::replacement(
117            "26.19.0-dev.0",
118            CatalogItemType::MaterializedView,
119            MZ_CATALOG_SCHEMA,
120            "mz_schemas",
121        ),
122        MigrationStep::replacement(
123            "26.19.0-dev.0",
124            CatalogItemType::MaterializedView,
125            MZ_CATALOG_SCHEMA,
126            "mz_role_members",
127        ),
128        MigrationStep::replacement(
129            "26.19.0-dev.0",
130            CatalogItemType::MaterializedView,
131            MZ_INTERNAL_SCHEMA,
132            "mz_network_policies",
133        ),
134        MigrationStep::replacement(
135            "26.19.0-dev.0",
136            CatalogItemType::MaterializedView,
137            MZ_INTERNAL_SCHEMA,
138            "mz_network_policy_rules",
139        ),
140        MigrationStep::replacement(
141            "26.19.0-dev.0",
142            CatalogItemType::MaterializedView,
143            MZ_INTERNAL_SCHEMA,
144            "mz_cluster_workload_classes",
145        ),
146        MigrationStep::replacement(
147            "26.19.0-dev.0",
148            CatalogItemType::MaterializedView,
149            MZ_INTERNAL_SCHEMA,
150            "mz_internal_cluster_replicas",
151        ),
152        MigrationStep::replacement(
153            "26.19.0-dev.0",
154            CatalogItemType::MaterializedView,
155            MZ_INTERNAL_SCHEMA,
156            "mz_pending_cluster_replicas",
157        ),
158        MigrationStep::replacement(
159            "26.20.0-dev.0",
160            CatalogItemType::MaterializedView,
161            MZ_CATALOG_SCHEMA,
162            "mz_materialized_views",
163        ),
164        MigrationStep::replacement(
165            "26.22.0-dev.0",
166            CatalogItemType::MaterializedView,
167            MZ_CATALOG_SCHEMA,
168            "mz_connections",
169        ),
170        MigrationStep::replacement(
171            "26.22.0-dev.0",
172            CatalogItemType::MaterializedView,
173            MZ_CATALOG_SCHEMA,
174            "mz_secrets",
175        ),
176        MigrationStep::replacement(
177            "26.27.0-dev.0",
178            CatalogItemType::MaterializedView,
179            MZ_CATALOG_SCHEMA,
180            "mz_sources",
181        ),
182        MigrationStep::replacement(
183            "26.29.0-dev.0",
184            CatalogItemType::MaterializedView,
185            MZ_CATALOG_SCHEMA,
186            "mz_indexes",
187        ),
188        MigrationStep::replacement(
189            "26.29.0-dev.0",
190            CatalogItemType::MaterializedView,
191            MZ_CATALOG_SCHEMA,
192            "mz_roles",
193        ),
194        MigrationStep::replacement(
195            "26.29.0-dev.0",
196            CatalogItemType::MaterializedView,
197            MZ_CATALOG_SCHEMA,
198            "mz_role_parameters",
199        ),
200        // Required because we added `mz_cluster_replica_size_internal_ind` builtin
201        // index without bumping mz_indexes. make_mz_indexes inlines the builtin-index
202        // set as VALUES, so any add/remove changes its SQL fingerprint and requires
203        // an explicit replacement step.
204        MigrationStep::replacement(
205            "26.30.0-dev.0",
206            CatalogItemType::MaterializedView,
207            MZ_CATALOG_SCHEMA,
208            "mz_indexes",
209        ),
210        MigrationStep::replacement(
211            "26.30.0-dev.0",
212            CatalogItemType::MaterializedView,
213            MZ_CATALOG_SCHEMA,
214            "mz_clusters",
215        ),
216        MigrationStep::replacement(
217            "26.30.0-dev.0",
218            CatalogItemType::MaterializedView,
219            MZ_CATALOG_SCHEMA,
220            "mz_cluster_replicas",
221        ),
222        MigrationStep::replacement(
223            "26.30.0-dev.0",
224            CatalogItemType::MaterializedView,
225            MZ_INTERNAL_SCHEMA,
226            "mz_cluster_schedules",
227        ),
228        MigrationStep::replacement(
229            "26.30.0-dev.0",
230            CatalogItemType::MaterializedView,
231            MZ_CATALOG_SCHEMA,
232            "mz_default_privileges",
233        ),
234        MigrationStep::replacement(
235            "26.30.0-dev.0",
236            CatalogItemType::MaterializedView,
237            MZ_CATALOG_SCHEMA,
238            "mz_system_privileges",
239        ),
240    ]
241});
242
243/// A migration required to upgrade past a specific version.
244#[derive(Clone, Debug)]
245struct MigrationStep {
246    /// The build version that requires this migration.
247    version: Version,
248    /// The object that requires migration.
249    object: SystemObjectDescription,
250    /// The migration mechanism to be used.
251    mechanism: Mechanism,
252}
253
254impl MigrationStep {
255    /// Helper to construct an `Evolution` migration step.
256    fn evolution(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
257        Self {
258            version: Version::parse(version).expect("valid"),
259            object: SystemObjectDescription {
260                schema_name: schema.into(),
261                object_type: type_,
262                object_name: name.into(),
263            },
264            mechanism: Mechanism::Evolution,
265        }
266    }
267
268    /// Helper to construct a `Replacement` migration step.
269    fn replacement(version: &str, type_: CatalogItemType, schema: &str, name: &str) -> Self {
270        Self {
271            version: Version::parse(version).expect("valid"),
272            object: SystemObjectDescription {
273                schema_name: schema.into(),
274                object_type: type_,
275                object_name: name.into(),
276            },
277            mechanism: Mechanism::Replacement,
278        }
279    }
280}
281
282/// The mechanism to use to migrate the schema of a builtin collection.
283#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
284#[allow(dead_code)]
285enum Mechanism {
286    /// Persist schema evolution.
287    ///
288    /// Keeps existing contents but only works for schema changes that are backward compatible
289    /// according to [`backward_compatible`].
290    Evolution,
291    /// Shard replacement.
292    ///
293    /// Works for arbitrary schema changes but loses existing contents.
294    Replacement,
295}
296
297/// The result of a builtin schema migration.
298pub(super) struct MigrationResult {
299    /// IDs of items whose shards have been replaced using the `Replacement` mechanism.
300    pub replaced_items: BTreeSet<CatalogItemId>,
301    /// A cleanup action to take once the migration has been made durable.
302    pub cleanup_action: BoxFuture<'static, ()>,
303}
304
305impl Default for MigrationResult {
306    fn default() -> Self {
307        Self {
308            replaced_items: Default::default(),
309            cleanup_action: async {}.boxed(),
310        }
311    }
312}
313
314/// Run builtin schema migrations.
315///
316/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded
317/// `BUILTINS` and `MIGRATIONS` lists to initialize the lists of available builtins and required
318/// migrations, respectively.
319pub(super) async fn run(
320    build_info: &BuildInfo,
321    deploy_generation: u64,
322    txn: &mut Transaction<'_>,
323    config: BuiltinItemMigrationConfig,
324) -> Result<MigrationResult, Error> {
325    // Sanity check to ensure we're not touching durable state in read-only mode.
326    assert_eq!(config.read_only, txn.is_savepoint());
327
328    // Tests may provide a dummy build info that confuses the migration step selection logic. Skip
329    // migrations if we observe this build info.
330    if *build_info == DUMMY_BUILD_INFO {
331        return Ok(MigrationResult::default());
332    }
333
334    let Some(durable_version) = get_migration_version(txn) else {
335        // New catalog; nothing to do.
336        return Ok(MigrationResult::default());
337    };
338    let build_version = build_info.semver_version();
339
340    let collection_metadata = txn.get_collection_metadata();
341    let system_objects = txn
342        .get_system_object_mappings()
343        .map(|m| {
344            let object = m.description;
345            let global_id = m.unique_identifier.global_id;
346            let shard_id = collection_metadata.get(&global_id).copied();
347            let Some((_, builtin)) = BUILTIN_LOOKUP.get(&object) else {
348                panic!("missing builtin {object:?}");
349            };
350            let info = ObjectInfo {
351                global_id,
352                shard_id,
353                builtin,
354                fingerprint: m.unique_identifier.fingerprint,
355            };
356            (object, info)
357        })
358        .collect();
359
360    let migration_shard = txn.get_builtin_migration_shard().expect("must exist");
361
362    let migration = Migration {
363        source_version: durable_version.clone(),
364        target_version: build_version.clone(),
365        deploy_generation,
366        system_objects,
367        migration_shard,
368        config,
369    };
370
371    let result = migration.run(&MIGRATIONS).await.map_err(|e| {
372        Error::new(ErrorKind::FailedBuiltinSchemaMigration {
373            last_seen_version: durable_version.to_string(),
374            this_version: build_version.to_string(),
375            cause: e.to_string(),
376        })
377    })?;
378
379    result.apply(txn);
380
381    let replaced_items = txn
382        .get_system_object_mappings()
383        .map(|m| m.unique_identifier)
384        .filter(|ids| result.new_shards.contains_key(&ids.global_id))
385        .map(|ids| ids.catalog_id)
386        .collect();
387
388    Ok(MigrationResult {
389        replaced_items,
390        cleanup_action: result.cleanup_action,
391    })
392}
393
394/// Result produced by `Migration::run`.
395struct MigrationRunResult {
396    new_shards: BTreeMap<GlobalId, ShardId>,
397    new_fingerprints: BTreeMap<SystemObjectDescription, String>,
398    shards_to_finalize: BTreeSet<ShardId>,
399    cleanup_action: BoxFuture<'static, ()>,
400}
401
402impl Default for MigrationRunResult {
403    fn default() -> Self {
404        Self {
405            new_shards: BTreeMap::new(),
406            new_fingerprints: BTreeMap::new(),
407            shards_to_finalize: BTreeSet::new(),
408            cleanup_action: async {}.boxed(),
409        }
410    }
411}
412
413impl MigrationRunResult {
414    /// Apply this migration result to the given transaction.
415    fn apply(&self, txn: &mut Transaction<'_>) {
416        // Update collection metadata.
417        let replaced_ids = self.new_shards.keys().copied().collect();
418        let old_metadata = txn.delete_collection_metadata(replaced_ids);
419        txn.insert_collection_metadata(self.new_shards.clone())
420            .expect("inserting unique shards IDs after deleting existing entries");
421
422        // Register shards for finalization.
423        let mut unfinalized_shards: BTreeSet<_> =
424            old_metadata.into_iter().map(|(_, sid)| sid).collect();
425        unfinalized_shards.extend(self.shards_to_finalize.iter().copied());
426        txn.insert_unfinalized_shards(unfinalized_shards)
427            .expect("cannot fail");
428
429        // Update fingerprints.
430        let mappings = txn
431            .get_system_object_mappings()
432            .filter_map(|m| {
433                let fingerprint = self.new_fingerprints.get(&m.description)?;
434                Some(SystemObjectMapping {
435                    description: m.description,
436                    unique_identifier: SystemObjectUniqueIdentifier {
437                        catalog_id: m.unique_identifier.catalog_id,
438                        global_id: m.unique_identifier.global_id,
439                        fingerprint: fingerprint.clone(),
440                    },
441                })
442            })
443            .collect();
444        txn.set_system_object_mappings(mappings)
445            .expect("filtered existing mappings remain unique");
446    }
447}
448
449/// Information about a system object required to run a `Migration`.
450#[derive(Clone, Debug)]
451struct ObjectInfo {
452    global_id: GlobalId,
453    shard_id: Option<ShardId>,
454    builtin: &'static Builtin<NameReference>,
455    fingerprint: String,
456}
457
458/// Context of a builtin schema migration.
459struct Migration {
460    /// The version we are migrating from.
461    ///
462    /// Same as the build version of the most recent leader process that successfully performed
463    /// migrations.
464    source_version: Version,
465    /// The version we are migration to.
466    ///
467    /// Same as the build version of this process.
468    target_version: Version,
469    /// The deploy generation of this process.
470    deploy_generation: u64,
471    /// Information about all objects in the system.
472    system_objects: BTreeMap<SystemObjectDescription, ObjectInfo>,
473    /// The ID of the migration shard.
474    migration_shard: ShardId,
475    /// Additional configuration.
476    config: BuiltinItemMigrationConfig,
477}
478
479impl Migration {
480    async fn run(self, steps: &[MigrationStep]) -> anyhow::Result<MigrationRunResult> {
481        info!(
482            deploy_generation = %self.deploy_generation,
483            "running builtin schema migration: {} -> {}",
484            self.source_version, self.target_version
485        );
486
487        self.validate_migration_steps(steps);
488
489        // Version-based migration filter fails for dev versions, see for example
490        // https://github.com/MaterializeInc/database-issues/issues/11335
491        let force_migration = if self.source_version != self.target_version
492            && self.source_version.pre.as_str().starts_with("dev")
493            && self.config.force_migration.is_none()
494        {
495            Some("evolution".to_string())
496        } else {
497            self.config.force_migration.clone()
498        };
499
500        let (force, plan) = match force_migration.as_deref() {
501            None => (false, self.plan_migration(steps)),
502            Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
503            Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
504            Some(other) => panic!("unknown force migration mechanism: {other}"),
505        };
506
507        if self.source_version == self.target_version && !force {
508            info!("skipping migration: already at target version");
509            return Ok(MigrationRunResult::default());
510        } else if self.source_version > self.target_version {
511            bail!("downgrade not supported");
512        }
513
514        // In leader mode, upgrade the version of the migration shard to the target version.
515        // This fences out any readers at lower versions.
516        if !self.config.read_only {
517            self.upgrade_migration_shard_version().await;
518        }
519
520        info!("executing migration plan: {plan:?}");
521
522        self.migrate_evolve(&plan.evolve).await?;
523        let new_shards = self.migrate_replace(&plan.replace).await?;
524
525        let mut migrated_objects = BTreeSet::new();
526        migrated_objects.extend(plan.evolve);
527        migrated_objects.extend(plan.replace);
528
529        let new_fingerprints = self.update_fingerprints(&migrated_objects)?;
530
531        let (shards_to_finalize, cleanup_action) = self.cleanup().await?;
532
533        Ok(MigrationRunResult {
534            new_shards,
535            new_fingerprints,
536            shards_to_finalize,
537            cleanup_action,
538        })
539    }
540
541    /// Sanity check the given migration steps.
542    ///
543    /// If any of these checks fail, that's a bug in Materialize, and we panic immediately.
544    fn validate_migration_steps(&self, steps: &[MigrationStep]) {
545        for step in steps {
546            assert!(
547                step.version <= self.target_version,
548                "migration step version greater than target version: {} > {}",
549                step.version,
550                self.target_version,
551            );
552
553            let object = &step.object;
554
555            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would
556            // cause the table to be truncated because the contents are not also stored in the durable
557            // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background
558            // on startup. The correctness of that pruning relies on there being no other retractions
559            // to `mz_storage_usage_by_shard`.
560            //
561            // TODO: Confirm the above reasoning, it might be outdated?
562            assert_ne!(
563                &*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object,
564                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
565            );
566
567            // Same hazard as `mz_storage_usage_by_shard`: the startup pruner
568            // (`Coordinator::prune_arrangement_sizes_history_on_startup`) assumes it is
569            // the only source of retractions, so migration-driven truncation would break it.
570            assert_ne!(
571                &*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object,
572                "mz_object_arrangement_size_history cannot be migrated or else the table will be truncated"
573            );
574
575            // `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it
576            // wouldn't be very durable if we allowed it to be truncated.
577            assert_ne!(
578                &*MZ_CATALOG_RAW_DESCRIPTION, object,
579                "mz_catalog_raw cannot be migrated"
580            );
581
582            let Some(object_info) = self.system_objects.get(object) else {
583                panic!("migration step for non-existent builtin: {object:?}");
584            };
585
586            let builtin = object_info.builtin;
587            use Builtin::*;
588            assert!(
589                matches!(builtin, Table(..) | Source(..) | MaterializedView(..)),
590                "schema migration not supported for builtin: {builtin:?}",
591            );
592        }
593    }
594
595    /// Select for each object to migrate the appropriate migration mechanism.
596    fn plan_migration(&self, steps: &[MigrationStep]) -> Plan {
597        // Ignore any steps at versions before `source_version`.
598        let steps = steps.iter().filter(|s| s.version > self.source_version);
599
600        // Select a mechanism for each object, according to the requested migrations:
601        //  * If any `Replacement` was requested, use `Replacement`.
602        //  * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`.
603        let mut by_object = BTreeMap::new();
604        for step in steps {
605            if let Some(entry) = by_object.get_mut(&step.object) {
606                *entry = match (step.mechanism, *entry) {
607                    (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution,
608                    (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => {
609                        Mechanism::Replacement
610                    }
611                };
612            } else {
613                by_object.insert(step.object.clone(), step.mechanism);
614            }
615        }
616
617        let mut plan = Plan::default();
618        for (object, mechanism) in by_object {
619            match mechanism {
620                Mechanism::Evolution => plan.evolve.push(object),
621                Mechanism::Replacement => plan.replace.push(object),
622            }
623        }
624
625        plan
626    }
627
628    /// Plan a forced migration of all objects using the given mechanism.
629    fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
630        let objects = self
631            .system_objects
632            .iter()
633            // Skip objects that don't yet have a shard registered. These are brand-new builtins
634            // added in this version; the leader will allocate their shards during bootstrap, and
635            // there is nothing to evolve or replace.
636            .filter(|(_, info)| info.shard_id.is_some())
637            .filter(|(_, info)| {
638                use Builtin::*;
639                match info.builtin {
640                    // Filter out the 'mz_storage_usage_by_shard' table since we need to retain
641                    // that info for billing purposes.
642                    Table(table) => **table != *MZ_STORAGE_USAGE_BY_SHARD,
643                    MaterializedView(..) => true,
644                    Source(source) => **source != *MZ_CATALOG_RAW,
645                    Log(..) | View(..) | Type(..) | Func(..) | Index(..) | Connection(..) => false,
646                }
647            })
648            .map(|(object, _)| object.clone())
649            .collect();
650
651        let mut plan = Plan::default();
652        match mechanism {
653            Mechanism::Evolution => plan.evolve = objects,
654            Mechanism::Replacement => plan.replace = objects,
655        }
656
657        plan
658    }
659
660    /// Upgrade the migration shard to the target version.
661    async fn upgrade_migration_shard_version(&self) {
662        let persist = &self.config.persist_client;
663        let diagnostics = Diagnostics {
664            shard_name: "builtin_migration".to_string(),
665            handle_purpose: format!("migration shard upgrade @ {}", self.target_version),
666        };
667
668        persist
669            .upgrade_version::<migration_shard::Key, ShardId, Timestamp, StorageDiff>(
670                self.migration_shard,
671                diagnostics,
672            )
673            .await
674            .expect("valid usage");
675    }
676
677    /// Migrate the given objects using the `Evolution` mechanism.
678    async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
679        for object in objects {
680            self.migrate_evolve_one(object).await?;
681        }
682        Ok(())
683    }
684
685    async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> {
686        let persist = &self.config.persist_client;
687
688        let Some(object_info) = self.system_objects.get(object) else {
689            bail!("missing builtin {object:?}");
690        };
691        let id = object_info.global_id;
692
693        let Some(shard_id) = object_info.shard_id else {
694            // No shard is registered for this builtin. In leader mode, this is fine, we'll
695            // register the shard during bootstrap. In read-only mode, we might be racing with the
696            // leader to register the shard and it's unclear what sort of confusion can arise from
697            // that -- better to bail out in this case.
698            if self.config.read_only {
699                bail!("missing shard ID for builtin {object:?} ({id})");
700            } else {
701                return Ok(());
702            }
703        };
704
705        let target_desc = match object_info.builtin {
706            Builtin::Table(table) => &table.desc,
707            Builtin::Source(source) => &source.desc,
708            Builtin::MaterializedView(mv) => &mv.desc,
709            _ => bail!("not a storage collection: {object:?}"),
710        };
711
712        let diagnostics = Diagnostics {
713            shard_name: id.to_string(),
714            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
715        };
716        let source_schema = persist
717            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
718            .await
719            .expect("valid usage");
720
721        info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution");
722
723        if self.config.read_only {
724            // In read-only mode, only check that the new schema is backward compatible.
725            // We'll register it when/if we restart in leader mode.
726            if let Some((_, source_desc, _)) = &source_schema {
727                let old = mz_persist_types::columnar::data_type::<SourceData>(source_desc)?;
728                let new = mz_persist_types::columnar::data_type::<SourceData>(target_desc)?;
729                if backward_compatible(&old, &new).is_none() {
730                    bail!(
731                        "incompatible schema evolution for {object:?}: \
732                         {source_desc:?} -> {target_desc:?}"
733                    );
734                }
735            }
736
737            return Ok(());
738        }
739
740        let (mut schema_id, mut source_desc) = match source_schema {
741            Some((schema_id, source_desc, _)) => (schema_id, source_desc),
742            None => {
743                // If no schema was previously registered, simply try to register the new one. This
744                // might fail due to a concurrent registration, in which case we'll fall back to
745                // `compare_and_evolve_schema`.
746
747                debug!(%id, %shard_id, "no previous schema found; registering initial one");
748                let schema_id = persist
749                    .register_schema::<SourceData, (), Timestamp, StorageDiff>(
750                        shard_id,
751                        target_desc,
752                        &UnitSchema,
753                        diagnostics.clone(),
754                    )
755                    .await
756                    .expect("valid usage");
757                if schema_id.is_some() {
758                    return Ok(());
759                }
760
761                debug!(%id, %shard_id, "schema registration failed; falling back to CaES");
762                let (schema_id, source_desc, _) = persist
763                    .latest_schema::<SourceData, (), Timestamp, StorageDiff>(
764                        shard_id,
765                        diagnostics.clone(),
766                    )
767                    .await
768                    .expect("valid usage")
769                    .expect("known to exist");
770
771                (schema_id, source_desc)
772            }
773        };
774
775        loop {
776            // Evolving the schema might fail if another process evolved the schema concurrently,
777            // in which case we need to retry. Most likely the other process evolved the schema to
778            // our own target schema and the second try is a no-op.
779
780            debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES");
781            let result = persist
782                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
783                    shard_id,
784                    schema_id,
785                    target_desc,
786                    &UnitSchema,
787                    diagnostics.clone(),
788                )
789                .await
790                .expect("valid usage");
791
792            match result {
793                CaESchema::Ok(schema_id) => {
794                    debug!(%id, %shard_id, %schema_id, "schema evolved successfully");
795                    break;
796                }
797                CaESchema::Incompatible => bail!(
798                    "incompatible schema evolution for {object:?}: \
799                     {source_desc:?} -> {target_desc:?}"
800                ),
801                CaESchema::ExpectedMismatch {
802                    schema_id: new_id,
803                    key,
804                    val: UnitSchema,
805                } => {
806                    schema_id = new_id;
807                    source_desc = key;
808                }
809            }
810        }
811
812        Ok(())
813    }
814
815    /// Migrate the given objects using the `Replacement` mechanism.
816    async fn migrate_replace(
817        &self,
818        objects: &[SystemObjectDescription],
819    ) -> anyhow::Result<BTreeMap<GlobalId, ShardId>> {
820        if objects.is_empty() {
821            return Ok(Default::default());
822        }
823
824        let diagnostics = Diagnostics {
825            shard_name: "builtin_migration".to_string(),
826            handle_purpose: format!("builtin schema migration @ {}", self.target_version),
827        };
828        let (mut persist_write, mut persist_read) =
829            self.open_migration_shard(diagnostics.clone()).await;
830
831        let mut ids_to_replace = BTreeSet::new();
832        for object in objects {
833            if let Some(info) = self.system_objects.get(object) {
834                ids_to_replace.insert(info.global_id);
835            } else {
836                bail!("missing id for builtin {object:?}");
837            }
838        }
839
840        info!(?objects, ?ids_to_replace, "migrating by replacement");
841
842        // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist.
843        // This can fail due to writes by concurrent processes, so we need to retry.
844        let replaced_shards = loop {
845            if let Some(shards) = self
846                .try_get_or_insert_replacement_shards(
847                    &ids_to_replace,
848                    &mut persist_write,
849                    &mut persist_read,
850                )
851                .await?
852            {
853                break shards;
854            }
855        };
856
857        Ok(replaced_shards)
858    }
859
860    /// Try to get or insert replacement shards for the given IDs into the migration shard, at
861    /// `target_version` and `deploy_generation`.
862    ///
863    /// This method looks for existing entries in the migration shards and returns those if they
864    /// are present. Otherwise it generates new shard IDs and tries to insert them.
865    ///
866    /// The result of this call is `None` if no existing entries were found and inserting new ones
867    /// failed because of a concurrent write to the migration shard. In this case, the caller is
868    /// expected to retry.
869    async fn try_get_or_insert_replacement_shards(
870        &self,
871        ids_to_replace: &BTreeSet<GlobalId>,
872        persist_write: &mut WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
873        persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
874    ) -> anyhow::Result<Option<BTreeMap<GlobalId, ShardId>>> {
875        let upper = persist_write.fetch_recent_upper().await;
876        let write_ts = *upper.as_option().expect("migration shard not sealed");
877
878        let mut ids_to_replace = ids_to_replace.clone();
879        let mut replaced_shards = BTreeMap::new();
880
881        // Another process might already have done a shard replacement at our version and
882        // generation, in which case we can directly reuse the replacement shards.
883        //
884        // Note that we can't assume that the previous process had the same `ids_to_replace` as we
885        // do. The set of migrations to run depends on both the source and the target version, and
886        // the migration shard is not keyed by source version. The previous writer might have seen
887        // a different source version, if there was a concurrent migration by a leader process.
888        if let Some(read_ts) = write_ts.step_back() {
889            let pred = |key: &migration_shard::Key| {
890                key.build_version == self.target_version
891                    && key.deploy_generation == Some(self.deploy_generation)
892            };
893            if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
894                for (key, shard_id) in entries {
895                    let id = GlobalId::System(key.global_id);
896                    if ids_to_replace.remove(&id) {
897                        replaced_shards.insert(id, shard_id);
898                    }
899                }
900
901                debug!(
902                    %read_ts, ?replaced_shards, ?ids_to_replace,
903                    "found existing entries in migration shard",
904                );
905            }
906
907            if ids_to_replace.is_empty() {
908                return Ok(Some(replaced_shards));
909            }
910        }
911
912        // Generate new shard IDs and attempt to insert them into the migration shard. If we get a
913        // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime
914        // and we need to re-check the migration shard contents.
915        let mut updates = Vec::new();
916        for id in ids_to_replace {
917            let shard_id = ShardId::new();
918            replaced_shards.insert(id, shard_id);
919
920            let GlobalId::System(global_id) = id else {
921                bail!("attempt to migrate a non-system collection: {id}");
922            };
923            let key = migration_shard::Key {
924                global_id,
925                build_version: self.target_version.clone(),
926                deploy_generation: Some(self.deploy_generation),
927            };
928            updates.push(((key, shard_id), write_ts, 1));
929        }
930
931        let upper = Antichain::from_elem(write_ts);
932        let new_upper = Antichain::from_elem(write_ts.step_forward());
933        debug!(%write_ts, "attempting insert into migration shard");
934        let result = persist_write
935            .compare_and_append(updates, upper, new_upper)
936            .await
937            .expect("valid usage");
938
939        match result {
940            Ok(()) => {
941                debug!(
942                    %write_ts, ?replaced_shards,
943                    "successfully inserted into migration shard"
944                );
945                Ok(Some(replaced_shards))
946            }
947            Err(_mismatch) => Ok(None),
948        }
949    }
950
951    /// Open writer and reader for the migration shard.
952    async fn open_migration_shard(
953        &self,
954        diagnostics: Diagnostics,
955    ) -> (
956        WriteHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
957        ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
958    ) {
959        let persist = &self.config.persist_client;
960
961        persist
962            .open(
963                self.migration_shard,
964                Arc::new(migration_shard::KeySchema),
965                Arc::new(ShardIdSchema),
966                diagnostics,
967                USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()),
968            )
969            .await
970            .expect("valid usage")
971    }
972
973    /// Open a [`SinceHandle`] for the migration shard.
974    async fn open_migration_shard_since(
975        &self,
976        diagnostics: Diagnostics,
977    ) -> SinceHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff> {
978        self.config
979            .persist_client
980            .open_critical_since(
981                self.migration_shard,
982                // TODO: We may need to use a different critical reader
983                // id for this if we want to be able to introspect it via SQL.
984                PersistClient::CONTROLLER_CRITICAL_SINCE,
985                Opaque::encode(&i64::MIN),
986                diagnostics.clone(),
987            )
988            .await
989            .expect("valid usage")
990    }
991
992    /// Update the fingerprints for `migrated_items`.
993    ///
994    /// Returns the new fingerprints. Also asserts that the current fingerprints of all other
995    /// system items match their builtin definitions.
996    fn update_fingerprints(
997        &self,
998        migrated_items: &BTreeSet<SystemObjectDescription>,
999    ) -> anyhow::Result<BTreeMap<SystemObjectDescription, String>> {
1000        let mut new_fingerprints = BTreeMap::new();
1001        for (object, object_info) in &self.system_objects {
1002            let id = object_info.global_id;
1003            let builtin = object_info.builtin;
1004
1005            let fingerprint = builtin.fingerprint();
1006            if fingerprint == object_info.fingerprint {
1007                continue; // fingerprint unchanged, nothing to do
1008            }
1009
1010            // Fingerprint mismatch is expected for a migrated item.
1011            let migrated = migrated_items.contains(object);
1012            // Some builtin types have schemas but no durable state. No migration needed for those.
1013            let ephemeral = matches!(
1014                builtin,
1015                Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_),
1016            );
1017
1018            if migrated || ephemeral {
1019                new_fingerprints.insert(object.clone(), fingerprint);
1020            } else if builtin.runtime_alterable() {
1021                // Runtime alterable builtins have no meaningful builtin fingerprint, and a
1022                // sentinel value stored in the catalog.
1023                assert_eq!(
1024                    object_info.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
1025                    "fingerprint mismatch for runtime-alterable builtin {object:?} ({id})",
1026                );
1027            } else {
1028                panic!(
1029                    "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}",
1030                    fingerprint, object_info.fingerprint,
1031                );
1032            }
1033        }
1034
1035        Ok(new_fingerprints)
1036    }
1037
1038    /// Perform cleanup of migration state, i.e. the migration shard.
1039    ///
1040    /// Returns a list of shards to finalize, and a `Future` that must be run after the shard
1041    /// finalization has been durably enqueued. The `Future` is used to remove entries from the
1042    /// migration shard only after we know the respective shards will be finalized. Removing
1043    /// entries immediately would risk leaking the shards.
1044    ///
1045    /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only
1046    /// processes a minimal as possible. Given that Materialize doesn't support version downgrades,
1047    /// it is safe to assume that any state for versions below the `target_version` is not needed
1048    /// anymore and can be cleaned up.
1049    ///
1050    /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration
1051    /// shard should always be pretty small, so keeping migration state around for longer isn't a
1052    /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response
1053    /// to transient failures, instead of retrying.
1054    async fn cleanup(&self) -> anyhow::Result<(BTreeSet<ShardId>, BoxFuture<'static, ()>)> {
1055        let noop_action = async {}.boxed();
1056        let noop_result = (BTreeSet::new(), noop_action);
1057
1058        if self.config.read_only {
1059            return Ok(noop_result);
1060        }
1061
1062        let diagnostics = Diagnostics {
1063            shard_name: "builtin_migration".to_string(),
1064            handle_purpose: "builtin schema migration cleanup".into(),
1065        };
1066        let (mut persist_write, mut persist_read) =
1067            self.open_migration_shard(diagnostics.clone()).await;
1068        let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await;
1069
1070        let upper = persist_write.fetch_recent_upper().await.clone();
1071        let write_ts = *upper.as_option().expect("migration shard not sealed");
1072        let Some(read_ts) = write_ts.step_back() else {
1073            return Ok(noop_result);
1074        };
1075
1076        // Collect old entries to remove.
1077        let pred = |key: &migration_shard::Key| key.build_version < self.target_version;
1078        let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await
1079        else {
1080            return Ok(noop_result);
1081        };
1082
1083        debug!(
1084            ?stale_entries,
1085            "cleaning migration shard up to version {}", self.target_version,
1086        );
1087
1088        let current_shards: BTreeMap<_, _> = self
1089            .system_objects
1090            .values()
1091            .filter_map(|o| o.shard_id.map(|shard_id| (o.global_id, shard_id)))
1092            .collect();
1093
1094        let mut shards_to_finalize = BTreeSet::new();
1095        let mut retractions = Vec::new();
1096        for (key, shard_id) in stale_entries {
1097            // The migration shard contains both shards created during aborted upgrades and shards
1098            // created during successful upgrades. The latter may still be in use, so we have to
1099            // check and only finalize those that aren't anymore.
1100            let gid = GlobalId::System(key.global_id);
1101            if current_shards.get(&gid) != Some(&shard_id) {
1102                shards_to_finalize.insert(shard_id);
1103            }
1104
1105            retractions.push(((key, shard_id), write_ts, -1));
1106        }
1107
1108        let cleanup_action = async move {
1109            if !retractions.is_empty() {
1110                let new_upper = Antichain::from_elem(write_ts.step_forward());
1111                let result = persist_write
1112                    .compare_and_append(retractions, upper, new_upper)
1113                    .await
1114                    .expect("valid usage");
1115                match result {
1116                    Ok(()) => debug!("cleaned up migration shard"),
1117                    Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"),
1118                }
1119            }
1120        }
1121        .boxed();
1122
1123        // Downgrade the since, to enable some compaction.
1124        let o = persist_since.opaque().clone();
1125        let new_since = Antichain::from_elem(read_ts);
1126        let result = persist_since
1127            .maybe_compare_and_downgrade_since(&o, (&o, &new_since))
1128            .await;
1129        soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch");
1130
1131        Ok((shards_to_finalize, cleanup_action))
1132    }
1133}
1134
1135/// Read the migration shard at the given timestamp, returning all entries that match the given
1136/// predicate.
1137///
1138/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at
1139/// `read_ts`.
1140async fn read_migration_shard<P>(
1141    persist_read: &mut ReadHandle<migration_shard::Key, ShardId, Timestamp, StorageDiff>,
1142    read_ts: Timestamp,
1143    predicate: P,
1144) -> Option<Vec<(migration_shard::Key, ShardId)>>
1145where
1146    P: for<'a> Fn(&migration_shard::Key) -> bool,
1147{
1148    let as_of = Antichain::from_elem(read_ts);
1149    let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?;
1150
1151    assert!(
1152        updates.iter().all(|(_, _, diff)| *diff == 1),
1153        "migration shard contains invalid diffs: {updates:?}",
1154    );
1155
1156    let entries: Vec<_> = updates
1157        .into_iter()
1158        .map(|(data, _, _)| data)
1159        .filter(move |(key, _)| predicate(key))
1160        .collect();
1161
1162    (!entries.is_empty()).then_some(entries)
1163}
1164
1165/// A plan to migrate between two versions.
1166#[derive(Debug, Default)]
1167struct Plan {
1168    /// Objects to migrate using the `Evolution` mechanism.
1169    evolve: Vec<SystemObjectDescription>,
1170    /// Objects to migrate using the `Replacement` mechanism.
1171    replace: Vec<SystemObjectDescription>,
1172}
1173
1174/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism.
1175mod migration_shard {
1176    use std::fmt;
1177    use std::str::FromStr;
1178
1179    use arrow::array::{StringArray, StringBuilder};
1180    use bytes::{BufMut, Bytes};
1181    use mz_persist_types::Codec;
1182    use mz_persist_types::codec_impls::{
1183        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
1184    };
1185    use mz_persist_types::columnar::Schema;
1186    use mz_persist_types::stats::NoneStats;
1187    use semver::Version;
1188    use serde::{Deserialize, Serialize};
1189
1190    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
1191    pub(super) struct Key {
1192        pub(super) global_id: u64,
1193        pub(super) build_version: Version,
1194        // Versions < 26.0 didn't include the deploy generation. As long as we still might
1195        // encounter migration shard entries that don't have it, we need to keep this an `Option`
1196        // and keep supporting both key formats.
1197        pub(super) deploy_generation: Option<u64>,
1198    }
1199
1200    impl fmt::Display for Key {
1201        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1202            if self.deploy_generation.is_some() {
1203                // current format
1204                let s = serde_json::to_string(self).expect("JSON serializable");
1205                f.write_str(&s)
1206            } else {
1207                // pre-26.0 format
1208                write!(f, "{}-{}", self.global_id, self.build_version)
1209            }
1210        }
1211    }
1212
1213    impl FromStr for Key {
1214        type Err = String;
1215
1216        fn from_str(s: &str) -> Result<Self, String> {
1217            // current format
1218            if let Ok(key) = serde_json::from_str(s) {
1219                return Ok(key);
1220            };
1221
1222            // pre-26.0 format
1223            let parts: Vec<_> = s.splitn(2, '-').collect();
1224            let &[global_id, build_version] = parts.as_slice() else {
1225                return Err(format!("invalid Key '{s}'"));
1226            };
1227            let global_id = global_id.parse::<u64>().map_err(|e| e.to_string())?;
1228            let build_version = build_version
1229                .parse::<Version>()
1230                .map_err(|e| e.to_string())?;
1231            Ok(Key {
1232                global_id,
1233                build_version,
1234                deploy_generation: None,
1235            })
1236        }
1237    }
1238
1239    impl Default for Key {
1240        fn default() -> Self {
1241            Self {
1242                global_id: Default::default(),
1243                build_version: Version::new(0, 0, 0),
1244                deploy_generation: Some(0),
1245            }
1246        }
1247    }
1248
1249    impl Codec for Key {
1250        type Schema = KeySchema;
1251        type Storage = ();
1252
1253        fn codec_name() -> String {
1254            "TableKey".into()
1255        }
1256
1257        fn encode<B: BufMut>(&self, buf: &mut B) {
1258            buf.put(self.to_string().as_bytes())
1259        }
1260
1261        fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result<Self, String> {
1262            let s = str::from_utf8(buf).map_err(|e| e.to_string())?;
1263            s.parse()
1264        }
1265
1266        fn encode_schema(_schema: &KeySchema) -> Bytes {
1267            Bytes::new()
1268        }
1269
1270        fn decode_schema(buf: &Bytes) -> Self::Schema {
1271            assert_eq!(*buf, Bytes::new());
1272            KeySchema
1273        }
1274    }
1275
1276    impl SimpleColumnarData for Key {
1277        type ArrowBuilder = StringBuilder;
1278        type ArrowColumn = StringArray;
1279
1280        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
1281            builder.values_slice().len()
1282        }
1283
1284        fn push(&self, builder: &mut Self::ArrowBuilder) {
1285            builder.append_value(&self.to_string());
1286        }
1287
1288        fn push_null(builder: &mut Self::ArrowBuilder) {
1289            builder.append_null();
1290        }
1291
1292        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
1293            *self = column.value(idx).parse().expect("valid Key");
1294        }
1295    }
1296
1297    #[derive(Debug, PartialEq)]
1298    pub(super) struct KeySchema;
1299
1300    impl Schema<Key> for KeySchema {
1301        type ArrowColumn = StringArray;
1302        type Statistics = NoneStats;
1303        type Decoder = SimpleColumnarDecoder<Key>;
1304        type Encoder = SimpleColumnarEncoder<Key>;
1305
1306        fn encoder(&self) -> anyhow::Result<SimpleColumnarEncoder<Key>> {
1307            Ok(SimpleColumnarEncoder::default())
1308        }
1309
1310        fn decoder(&self, col: StringArray) -> anyhow::Result<SimpleColumnarDecoder<Key>> {
1311            Ok(SimpleColumnarDecoder::new(col))
1312        }
1313    }
1314}
1315
1316#[cfg(test)]
1317#[path = "builtin_schema_migration_tests.rs"]
1318mod tests;