Skip to main content

mz_catalog/durable/
upgrade.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains all the helpers and code paths for upgrading/migrating the `Catalog`.
11//!
12//! We facilitate migrations by keeping snapshots of the objects we previously stored, and relying
13//! entirely on these snapshots. These snapshots exist in the [`mz_catalog_protos`] crate in the
14//! form of `catalog-protos/protos/objects_vXX.proto`. By maintaining and relying on snapshots we
15//! don't have to worry about changes elsewhere in the codebase effecting our migrations because
16//! our application and serialization logic is decoupled, and the objects of the Catalog for a
17//! given version are "frozen in time".
18//!
19//! > **Note**: The protobuf snapshot files themselves live in a separate crate because it takes a
20//!             relatively significant amount of time to codegen and build them. By placing them in
21//!             a separate crate we don't have to pay this compile time cost when building the
22//!             catalog, allowing for faster iteration.
23//!
24//! You cannot make any changes to the following message or anything that they depend on:
25//!
26//!   - Config
27//!   - Setting
28//!   - FenceToken
29//!   - AuditLog
30//!
31//! When you want to make a change to the `Catalog` you need to follow these steps:
32//!
33//! 1. Check the current [`CATALOG_VERSION`], make sure an `objects_v<CATALOG_VERSION>.proto` file
34//!    exists. If one doesn't, copy and paste the current `objects.proto` file, renaming it to
35//!    `objects_v<CATALOG_VERSION>.proto`.
36//! 2. Bump [`CATALOG_VERSION`] by one.
37//! 3. Make your changes to `objects.proto`.
38//! 4. Copy and paste `objects.proto`, naming the copy `objects_v<CATALOG_VERSION>.proto`. Update
39//!    the package name of the `.proto` to `package objects_v<CATALOG_VERSION>;`
40//! 5. We should now have a copy of the protobuf objects as they currently exist, and a copy of
41//!    how we want them to exist. For example, if the version of the Catalog before we made our
42//!    changes was 15, we should now have `objects_v15.proto` and `objects_v16.proto`.
43//! 6. Rebuild Materialize which will error because the hashes stored in
44//!    `src/catalog-protos/protos/hashes.json` have now changed. Update these to match the new
45//!    hashes for objects.proto and `objects_v<CATALOG_VERSION>.proto`.
46//! 7. Add `v<CATALOG_VERSION>` to the call to the `objects!` macro in this file
47//! 8. Add a new file to `catalog/src/durable/upgrade` which is where we'll put the new migration
48//!    path.
49//! 9. Write upgrade functions using the two versions of the protos we now have, e.g.
50//!    `objects_v15.proto` and `objects_v16.proto`. In this migration code you __should not__
51//!    import any defaults or constants from elsewhere in the codebase, because then a future
52//!    change could then impact a previous migration.
53//! 10. Add an import for your new module to this file: mod v<CATALOG_VERSION-1>_to_v<CATALOG_VERSION>;
54//! 11. Call your upgrade function in [`run_upgrade()`].
55//! 12. Generate a test file for the new version:
56//!     ```ignore
57//!     cargo test --package mz-catalog --lib durable::upgrade::tests::generate_missing_encodings -- --ignored
58//!     ```
59//!
60//! When in doubt, reach out to the Surfaces team, and we'll be more than happy to help :)
61
62pub mod json_compatible;
63#[cfg(test)]
64mod tests;
65
66use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
67use mz_repr::Diff;
68use paste::paste;
69#[cfg(test)]
70use proptest::prelude::*;
71#[cfg(test)]
72use proptest::strategy::ValueTree;
73#[cfg(test)]
74use proptest_derive::Arbitrary;
75use timely::progress::Timestamp as TimelyTimestamp;
76
77use crate::durable::initialize::USER_VERSION_KEY;
78use crate::durable::objects::serialization::proto;
79use crate::durable::objects::state_update::{
80    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
81};
82use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
83use crate::durable::{CatalogError, DurableCatalogError};
84
85#[cfg(test)]
86const ENCODED_TEST_CASES: usize = 100;
87
88/// Generate per-version support code.
89///
90/// Here we have to deal with the fact that the pre-v79 objects had a protobuf-generated format,
91/// which gives them additional levels of indirection that the post-v79 objects don't have and thus
92/// requires slightly different code to be generated.
93macro_rules! objects {
94    ( [$( $x_old:ident ),*], [$( $x:ident ),*] ) => {
95        paste! {
96            $(
97                pub(crate) mod [<objects_ $x_old>] {
98                    pub use mz_catalog_protos::[<objects_ $x_old>]::*;
99
100                    use crate::durable::objects::state_update::StateUpdateKindJson;
101
102                    impl From<StateUpdateKind> for StateUpdateKindJson {
103                        fn from(value: StateUpdateKind) -> Self {
104                            let kind = value.kind.expect("kind should be set");
105                            // TODO: This requires that the json->proto->json roundtrips
106                            // exactly, see database-issues#7179.
107                            StateUpdateKindJson::from_serde(&kind)
108                        }
109                    }
110
111                    impl From<StateUpdateKindJson> for StateUpdateKind {
112                        fn from(value: StateUpdateKindJson) -> Self {
113                            let kind: state_update_kind::Kind = value.to_serde();
114                            StateUpdateKind { kind: Some(kind) }
115                        }
116                    }
117                }
118            )*
119
120            $(
121                pub(crate) mod [<objects_ $x>] {
122                    pub use mz_catalog_protos::[<objects_ $x>]::*;
123
124                    use crate::durable::objects::state_update::StateUpdateKindJson;
125
126                    impl From<StateUpdateKind> for StateUpdateKindJson {
127                        fn from(value: StateUpdateKind) -> Self {
128                            // TODO: This requires that the json->proto->json roundtrips
129                            // exactly, see database-issues#7179.
130                            StateUpdateKindJson::from_serde(&value)
131                        }
132                    }
133
134                    impl From<StateUpdateKindJson> for StateUpdateKind {
135                        fn from(value: StateUpdateKindJson) -> Self {
136                            value.to_serde()
137                        }
138                    }
139                }
140            )*
141
142            // Generate test helpers for each version.
143
144            #[cfg(test)]
145            #[derive(Debug, Arbitrary)]
146            enum AllVersionsStateUpdateKind {
147                $(
148                    [<$x_old:upper>](crate::durable::upgrade::[<objects_ $x_old>]::StateUpdateKind),
149                )*
150                $(
151                    [<$x:upper>](crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind),
152                )*
153            }
154
155            #[cfg(test)]
156            impl AllVersionsStateUpdateKind {
157                #[cfg(test)]
158                fn arbitrary_vec(version: &str) -> Result<Vec<Self>, String> {
159                    let mut runner = proptest::test_runner::TestRunner::deterministic();
160                    std::iter::repeat(())
161                        .filter_map(|_| {
162                            AllVersionsStateUpdateKind::arbitrary(version, &mut runner)
163                                .transpose()
164                        })
165                        .take(ENCODED_TEST_CASES)
166                        .collect::<Result<_, _>>()
167                }
168
169                #[cfg(test)]
170                fn arbitrary(
171                    version: &str,
172                    runner: &mut proptest::test_runner::TestRunner,
173                ) -> Result<Option<Self>, String> {
174                    match version {
175                        $(
176                            concat!("objects_", stringify!($x_old)) => {
177                                let arbitrary_data =
178                                    crate::durable::upgrade
179                                        ::[<objects_ $x_old>]::StateUpdateKind::arbitrary()
180                                        .new_tree(runner)
181                                        .expect("unable to create arbitrary data")
182                                        .current();
183                                // Skip over generated data where kind is None
184                                // because they are not interesting or possible in
185                                // production. Unfortunately any of the inner fields
186                                // can still be None, which is also not possible in
187                                // production.
188                                // TODO(jkosh44) See if there's an arbitrary config
189                                // that forces Some.
190                                let arbitrary_data = if arbitrary_data.kind.is_some() {
191                                    Some(Self::[<$x_old:upper>](arbitrary_data))
192                                } else {
193                                    None
194                                };
195                                Ok(arbitrary_data)
196                            }
197                        )*
198                        $(
199                            concat!("objects_", stringify!($x)) => {
200                                let arbitrary_data =
201                                    crate::durable::upgrade
202                                        ::[<objects_ $x>]::StateUpdateKind::arbitrary()
203                                        .new_tree(runner)
204                                        .expect("unable to create arbitrary data")
205                                        .current();
206                                Ok(Some(Self::[<$x:upper>](arbitrary_data)))
207                            }
208                        )*
209                        _ => Err(format!("unrecognized version {version} add enum variant")),
210                    }
211                }
212
213                #[cfg(test)]
214                fn try_from_raw(version: &str, raw: StateUpdateKindJson) -> Result<Self, String> {
215                    match version {
216                        $(
217                            concat!("objects_", stringify!($x_old)) => Ok(Self::[<$x_old:upper>](raw.into())),
218                        )*
219                        $(
220                            concat!("objects_", stringify!($x)) => Ok(Self::[<$x:upper>](raw.into())),
221
222                        )*
223                        _ => Err(format!("unrecognized version {version} add enum variant")),
224                    }
225                }
226
227                #[cfg(test)]
228                fn raw(self) -> StateUpdateKindJson {
229                    match self {
230                        $(
231                            Self::[<$x_old:upper>](kind) => kind.into(),
232                        )*
233                        $(
234                            Self::[<$x:upper>](kind) => kind.into(),
235                        )*
236                    }
237                }
238            }
239        }
240    }
241}
242
243objects!(
244    [v74, v75, v76, v77, v78],
245    [v79, v80, v81, v82, v83, v84, v85, v86, v87, v88]
246);
247
248/// The current version of the `Catalog`.
249pub use mz_catalog_protos::CATALOG_VERSION;
250/// The minimum `Catalog` version number that we support migrating from.
251pub use mz_catalog_protos::MIN_CATALOG_VERSION;
252
253// Note(parkmycar): Ideally we wouldn't have to define these extra constants,
254// but const expressions aren't yet supported in match statements.
255const TOO_OLD_VERSION: u64 = MIN_CATALOG_VERSION - 1;
256const FUTURE_VERSION: u64 = CATALOG_VERSION + 1;
257
258mod v74_to_v75;
259mod v75_to_v76;
260mod v76_to_v77;
261mod v77_to_v78;
262mod v78_to_v79;
263mod v79_to_v80;
264mod v80_to_v81;
265mod v81_to_v82;
266mod v82_to_v83;
267mod v83_to_v84;
268mod v84_to_v85;
269mod v85_to_v86;
270mod v86_to_v87;
271mod v87_to_v88;
272
273/// Describes a single action to take during a migration from `V1` to `V2`.
274#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
275enum MigrationAction<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> {
276    /// Deletes the provided key.
277    #[allow(unused)]
278    Delete(V1),
279    /// Inserts the provided key-value pair. The key must not currently exist!
280    #[allow(unused)]
281    Insert(V2),
282    /// Update the key-value pair for the provided key.
283    #[allow(unused)]
284    Update(V1, V2),
285}
286
287impl<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> MigrationAction<V1, V2> {
288    /// Converts `self` into a `Vec<StateUpdate<StateUpdateKindBinary>>` that can be appended
289    /// to persist.
290    fn into_updates(self) -> Vec<(StateUpdateKindJson, Diff)> {
291        match self {
292            MigrationAction::Delete(kind) => {
293                vec![(kind.into(), Diff::MINUS_ONE)]
294            }
295            MigrationAction::Insert(kind) => {
296                vec![(kind.into(), Diff::ONE)]
297            }
298            MigrationAction::Update(old_kind, new_kind) => {
299                vec![
300                    (old_kind.into(), Diff::MINUS_ONE),
301                    (new_kind.into(), Diff::ONE),
302                ]
303            }
304        }
305    }
306}
307
308/// Upgrades the data in the catalog to version [`CATALOG_VERSION`].
309///
310/// Returns the current upper after all migrations have executed.
311#[mz_ore::instrument(name = "persist::upgrade", level = "debug")]
312pub(crate) async fn upgrade(
313    persist_handle: &mut UnopenedPersistCatalogState,
314    mut commit_ts: Timestamp,
315) -> Result<Timestamp, CatalogError> {
316    soft_assert_ne_or_log!(
317        persist_handle.upper,
318        Timestamp::minimum(),
319        "cannot upgrade uninitialized catalog"
320    );
321
322    // Consolidate to avoid migrating old state.
323    persist_handle.consolidate();
324    let mut version = persist_handle
325        .get_user_version()
326        .await?
327        .expect("initialized catalog must have a version");
328    // Run migrations until we're up-to-date.
329    while version < CATALOG_VERSION {
330        (version, commit_ts) = run_upgrade(persist_handle, version, commit_ts).await?;
331    }
332
333    Ok(commit_ts)
334}
335
336/// Determines which upgrade to run for the `version` and executes it.
337///
338/// Returns the new version and upper.
339async fn run_upgrade(
340    unopened_catalog_state: &mut UnopenedPersistCatalogState,
341    version: u64,
342    commit_ts: Timestamp,
343) -> Result<(u64, Timestamp), CatalogError> {
344    let incompatible = DurableCatalogError::IncompatibleDataVersion {
345        found_version: version,
346        min_catalog_version: MIN_CATALOG_VERSION,
347        catalog_version: CATALOG_VERSION,
348    }
349    .into();
350
351    match version {
352        ..=TOO_OLD_VERSION => Err(incompatible),
353
354        74 => {
355            run_versioned_upgrade(
356                unopened_catalog_state,
357                version,
358                commit_ts,
359                v74_to_v75::upgrade,
360            )
361            .await
362        }
363        75 => {
364            run_versioned_upgrade(
365                unopened_catalog_state,
366                version,
367                commit_ts,
368                v75_to_v76::upgrade,
369            )
370            .await
371        }
372        76 => {
373            run_versioned_upgrade(
374                unopened_catalog_state,
375                version,
376                commit_ts,
377                v76_to_v77::upgrade,
378            )
379            .await
380        }
381        77 => {
382            run_versioned_upgrade(
383                unopened_catalog_state,
384                version,
385                commit_ts,
386                v77_to_v78::upgrade,
387            )
388            .await
389        }
390        78 => {
391            run_versioned_upgrade(
392                unopened_catalog_state,
393                version,
394                commit_ts,
395                v78_to_v79::upgrade,
396            )
397            .await
398        }
399        79 => {
400            run_versioned_upgrade(
401                unopened_catalog_state,
402                version,
403                commit_ts,
404                v79_to_v80::upgrade,
405            )
406            .await
407        }
408        80 => {
409            run_versioned_upgrade(
410                unopened_catalog_state,
411                version,
412                commit_ts,
413                v80_to_v81::upgrade,
414            )
415            .await
416        }
417        81 => {
418            run_versioned_upgrade(
419                unopened_catalog_state,
420                version,
421                commit_ts,
422                v81_to_v82::upgrade,
423            )
424            .await
425        }
426        // v82→v83 is a one-shot byte-level repair, not a proto evolution. It
427        // needs raw access to the snapshot's diffs (which `run_versioned_upgrade`
428        // strips) so it plugs into `run_upgrade` directly.
429        82 => v82_to_v83::upgrade(unopened_catalog_state, commit_ts).await,
430        83 => v83_to_v84::upgrade(unopened_catalog_state, commit_ts).await,
431        84 => {
432            run_versioned_upgrade(
433                unopened_catalog_state,
434                version,
435                commit_ts,
436                v84_to_v85::upgrade,
437            )
438            .await
439        }
440        85 => {
441            run_versioned_upgrade(
442                unopened_catalog_state,
443                version,
444                commit_ts,
445                v85_to_v86::upgrade,
446            )
447            .await
448        }
449        86 => {
450            run_versioned_upgrade(
451                unopened_catalog_state,
452                version,
453                commit_ts,
454                v86_to_v87::upgrade,
455            )
456            .await
457        }
458        87 => {
459            run_versioned_upgrade(
460                unopened_catalog_state,
461                version,
462                commit_ts,
463                v87_to_v88::upgrade,
464            )
465            .await
466        }
467        // Up-to-date, no migration needed!
468        CATALOG_VERSION => Ok((CATALOG_VERSION, commit_ts)),
469        FUTURE_VERSION.. => Err(incompatible),
470    }
471}
472
473/// Runs `migration_logic` on the contents of the current catalog assuming a current version of
474/// `current_version`.
475///
476/// Returns the new version and upper.
477async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson>(
478    unopened_catalog_state: &mut UnopenedPersistCatalogState,
479    current_version: u64,
480    mut commit_ts: Timestamp,
481    migration_logic: impl FnOnce(Vec<V1>) -> Vec<MigrationAction<V1, V2>>,
482) -> Result<(u64, Timestamp), CatalogError> {
483    tracing::info!(current_version, "running versioned Catalog upgrade");
484
485    // 1. Use the V1 to deserialize the contents of the current snapshot.
486    let snapshot: Vec<_> = unopened_catalog_state
487        .snapshot
488        .iter()
489        .map(|(kind, ts, diff)| {
490            soft_assert_eq_or_log!(
491                *diff,
492                Diff::ONE,
493                "snapshot is consolidated, ({kind:?}, {ts:?}, {diff:?})"
494            );
495            V1::try_from(kind.clone()).expect("invalid catalog data persisted")
496        })
497        .collect();
498
499    // 2. Generate updates from version specific migration logic.
500    let migration_actions = migration_logic(snapshot);
501    let mut updates: Vec<_> = migration_actions
502        .into_iter()
503        .flat_map(|action| action.into_updates().into_iter())
504        .collect();
505    // Validate that we're not migrating an un-migratable collection.
506    for (update, _) in &updates {
507        if update.is_always_deserializable() {
508            panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}");
509        }
510    }
511
512    // 3. Add a retraction for old version and insertion for new version into updates.
513    let next_version = current_version + 1;
514    let version_retraction = (version_update_kind(current_version), Diff::MINUS_ONE);
515    updates.push(version_retraction);
516    let version_insertion = (version_update_kind(next_version), Diff::ONE);
517    updates.push(version_insertion);
518
519    // 4. Apply migration to catalog.
520    if matches!(unopened_catalog_state.mode, Mode::Writable) {
521        commit_ts = unopened_catalog_state
522            .compare_and_append(updates, commit_ts)
523            .await
524            .map_err(|e| e.unwrap_fence_error())?;
525    } else {
526        let ts = commit_ts;
527        let updates = updates
528            .into_iter()
529            .map(|(kind, diff)| StateUpdate { kind, ts, diff });
530        commit_ts = commit_ts.step_forward();
531        unopened_catalog_state.apply_updates_and_consolidate(updates)?;
532    }
533
534    // 5. Consolidate snapshot to remove old versions.
535    unopened_catalog_state.consolidate();
536
537    Ok((next_version, commit_ts))
538}
539
540/// Generates a [`proto::StateUpdateKind`] to update the user version.
541fn version_update_kind(version: u64) -> StateUpdateKindJson {
542    // We can use the current version because Configs can never be migrated and are always wire
543    // compatible.
544    StateUpdateKind::Config(
545        proto::ConfigKey {
546            key: USER_VERSION_KEY.to_string(),
547        },
548        proto::ConfigValue { value: version },
549    )
550    .into()
551}