mz_catalog/durable/
upgrade.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains all the helpers and code paths for upgrading/migrating the `Catalog`.
11//!
12//! We facilitate migrations by keeping snapshots of the objects we previously stored, and relying
13//! entirely on these snapshots. These snapshots exist in the [`mz_catalog_protos`] crate in the
14//! form of `catalog-protos/protos/objects_vXX.proto`. By maintaining and relying on snapshots we
15//! don't have to worry about changes elsewhere in the codebase effecting our migrations because
16//! our application and serialization logic is decoupled, and the objects of the Catalog for a
17//! given version are "frozen in time".
18//!
19//! > **Note**: The protobuf snapshot files themselves live in a separate crate because it takes a
20//!             relatively significant amount of time to codegen and build them. By placing them in
21//!             a separate crate we don't have to pay this compile time cost when building the
22//!             catalog, allowing for faster iteration.
23//!
24//! You cannot make any changes to the following message or anything that they depend on:
25//!
26//!   - Config
27//!   - Setting
28//!   - FenceToken
29//!   - AuditLog
30//!
31//! When you want to make a change to the `Catalog` you need to follow these steps:
32//!
33//! 1. Check the current [`CATALOG_VERSION`], make sure an `objects_v<CATALOG_VERSION>.proto` file
34//!    exists. If one doesn't, copy and paste the current `objects.proto` file, renaming it to
35//!    `objects_v<CATALOG_VERSION>.proto`.
36//! 2. Bump [`CATALOG_VERSION`] by one.
37//! 3. Make your changes to `objects.proto`.
38//! 4. Copy and paste `objects.proto`, naming the copy `objects_v<CATALOG_VERSION>.proto`. Update
39//!    the package name of the `.proto` to `package objects_v<CATALOG_VERSION>;`
40//! 5. We should now have a copy of the protobuf objects as they currently exist, and a copy of
41//!    how we want them to exist. For example, if the version of the Catalog before we made our
42//!    changes was 15, we should now have `objects_v15.proto` and `objects_v16.proto`.
43//! 6. Rebuild Materialize which will error because the hashes stored in
44//!    `src/catalog-protos/protos/hashes.json` have now changed. Update these to match the new
45//!    hashes for objects.proto and `objects_v<CATALOG_VERSION>.proto`.
46//! 7. Add `v<CATALOG_VERSION>` to the call to the `objects!` macro in this file, and to the
47//!    `proto_objects!` macro in the [`mz_catalog_protos`] crate.
48//! 8. Add a new file to `catalog/src/durable/upgrade` which is where we'll put the new migration
49//!    path.
50//! 9. Write upgrade functions using the two versions of the protos we now have, e.g.
51//!    `objects_v15.proto` and `objects_v16.proto`. In this migration code you __should not__
52//!    import any defaults or constants from elsewhere in the codebase, because then a future
53//!    change could then impact a previous migration.
54//! 10. Add an import for your new module to this file: mod v<CATALOG_VERSION-1>_to_v<CATALOG_VERSION>;
55//! 11. Call your upgrade function in [`run_upgrade()`].
56//! 12. Generate a test file for the new version:
57//!     ```ignore
58//!     cargo test --package mz-catalog --lib durable::upgrade::tests::generate_missing_encodings -- --ignored
59//!     ```
60//!
61//! When in doubt, reach out to the Surfaces team, and we'll be more than happy to help :)
62
63#[cfg(test)]
64mod tests;
65pub mod wire_compatible;
66
67use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
68use mz_repr::Diff;
69use paste::paste;
70#[cfg(test)]
71use proptest::prelude::*;
72#[cfg(test)]
73use proptest::strategy::ValueTree;
74#[cfg(test)]
75use proptest_derive::Arbitrary;
76use timely::progress::Timestamp as TimelyTimestamp;
77
78use crate::durable::initialize::USER_VERSION_KEY;
79use crate::durable::objects::serialization::proto;
80use crate::durable::objects::state_update::{
81    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
82};
83use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
84use crate::durable::{CatalogError, DurableCatalogError};
85
86#[cfg(test)]
87const ENCODED_TEST_CASES: usize = 100;
88
89macro_rules! objects {
90    ( $( $x:ident ),* ) => {
91        paste! {
92            $(
93                pub(crate) mod [<objects_ $x>] {
94                    pub use mz_catalog_protos::[<objects_ $x>]::*;
95
96                    use crate::durable::objects::state_update::StateUpdateKindJson;
97
98                    impl From<StateUpdateKind> for StateUpdateKindJson {
99                        fn from(value: StateUpdateKind) -> Self {
100                            let kind = value.kind.expect("kind should be set");
101                            // TODO: This requires that the json->proto->json roundtrips
102                            // exactly, see database-issues#7179.
103                            StateUpdateKindJson::from_serde(&kind)
104                        }
105                    }
106
107                    impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
108                        type Error = String;
109
110                        fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
111                            let kind: state_update_kind::Kind = value.to_serde();
112                            Ok(StateUpdateKind { kind: Some(kind) })
113                        }
114                    }
115                }
116            )*
117
118            // Generate test helpers for each version.
119
120            #[cfg(test)]
121            #[derive(Debug, Arbitrary)]
122            enum AllVersionsStateUpdateKind {
123                $(
124                    [<$x:upper>](crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind),
125                )*
126            }
127
128            #[cfg(test)]
129            impl AllVersionsStateUpdateKind {
130                #[cfg(test)]
131                fn arbitrary_vec(version: &str) -> Result<Vec<Self>, String> {
132                    let mut runner = proptest::test_runner::TestRunner::deterministic();
133                    std::iter::repeat(())
134                        .filter_map(|_| AllVersionsStateUpdateKind::arbitrary(version, &mut runner).transpose())
135                        .take(ENCODED_TEST_CASES)
136                        .collect::<Result<_, _>>()
137                }
138
139                #[cfg(test)]
140                fn arbitrary(
141                    version: &str,
142                    runner: &mut proptest::test_runner::TestRunner,
143                ) -> Result<Option<Self>, String> {
144                    match version {
145                        $(
146                            concat!("objects_", stringify!($x)) => {
147                                let arbitrary_data =
148                                    crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind::arbitrary()
149                                        .new_tree(runner)
150                                        .expect("unable to create arbitrary data")
151                                        .current();
152                                // Skip over generated data where kind is None because they are not interesting or
153                                // possible in production. Unfortunately any of the inner fields can still be None,
154                                // which is also not possible in production.
155                                // TODO(jkosh44) See if there's an arbitrary config that forces Some.
156                                let arbitrary_data = if arbitrary_data.kind.is_some() {
157                                    Some(Self::[<$x:upper>](arbitrary_data))
158                                } else {
159                                    None
160                                };
161                                Ok(arbitrary_data)
162                            }
163                        )*
164                        _ => Err(format!("unrecognized version {version} add enum variant")),
165                    }
166                }
167
168                #[cfg(test)]
169                fn try_from_raw(version: &str, raw: StateUpdateKindJson) -> Result<Self, String> {
170                    match version {
171                        $(
172                            concat!("objects_", stringify!($x)) => Ok(Self::[<$x:upper>](raw.try_into()?)),
173                        )*
174                        _ => Err(format!("unrecognized version {version} add enum variant")),
175                    }
176                }
177
178                #[cfg(test)]
179                fn raw(self) -> StateUpdateKindJson {
180                    match self {
181                        $(
182                            Self::[<$x:upper>](kind) => kind.into(),
183                        )*
184                    }
185                }
186            }
187        }
188    }
189}
190
191objects!(v67, v68, v69, v70, v71, v72, v73, v74, v75, v76, v77, v78);
192
193/// The current version of the `Catalog`.
194pub use mz_catalog_protos::CATALOG_VERSION;
195/// The minimum `Catalog` version number that we support migrating from.
196pub use mz_catalog_protos::MIN_CATALOG_VERSION;
197
198// Note(parkmycar): Ideally we wouldn't have to define these extra constants,
199// but const expressions aren't yet supported in match statements.
200const TOO_OLD_VERSION: u64 = MIN_CATALOG_VERSION - 1;
201const FUTURE_VERSION: u64 = CATALOG_VERSION + 1;
202
203mod v67_to_v68;
204mod v68_to_v69;
205mod v69_to_v70;
206mod v70_to_v71;
207mod v71_to_v72;
208mod v72_to_v73;
209mod v73_to_v74;
210mod v74_to_v75;
211mod v75_to_v76;
212mod v76_to_v77;
213mod v77_to_v78;
214
215/// Describes a single action to take during a migration from `V1` to `V2`.
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
217enum MigrationAction<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> {
218    /// Deletes the provided key.
219    #[allow(unused)]
220    Delete(V1),
221    /// Inserts the provided key-value pair. The key must not currently exist!
222    #[allow(unused)]
223    Insert(V2),
224    /// Update the key-value pair for the provided key.
225    #[allow(unused)]
226    Update(V1, V2),
227}
228
229impl<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> MigrationAction<V1, V2> {
230    /// Converts `self` into a `Vec<StateUpdate<StateUpdateKindBinary>>` that can be appended
231    /// to persist.
232    fn into_updates(self) -> Vec<(StateUpdateKindJson, Diff)> {
233        match self {
234            MigrationAction::Delete(kind) => {
235                vec![(kind.into(), Diff::MINUS_ONE)]
236            }
237            MigrationAction::Insert(kind) => {
238                vec![(kind.into(), Diff::ONE)]
239            }
240            MigrationAction::Update(old_kind, new_kind) => {
241                vec![
242                    (old_kind.into(), Diff::MINUS_ONE),
243                    (new_kind.into(), Diff::ONE),
244                ]
245            }
246        }
247    }
248}
249
250/// Upgrades the data in the catalog to version [`CATALOG_VERSION`].
251///
252/// Returns the current upper after all migrations have executed.
253#[mz_ore::instrument(name = "persist::upgrade", level = "debug")]
254pub(crate) async fn upgrade(
255    persist_handle: &mut UnopenedPersistCatalogState,
256    mut commit_ts: Timestamp,
257) -> Result<Timestamp, CatalogError> {
258    soft_assert_ne_or_log!(
259        persist_handle.upper,
260        Timestamp::minimum(),
261        "cannot upgrade uninitialized catalog"
262    );
263
264    // Consolidate to avoid migrating old state.
265    persist_handle.consolidate();
266    let mut version = persist_handle
267        .get_user_version()
268        .await?
269        .expect("initialized catalog must have a version");
270    // Run migrations until we're up-to-date.
271    while version < CATALOG_VERSION {
272        (version, commit_ts) = run_upgrade(persist_handle, version, commit_ts).await?;
273    }
274
275    Ok(commit_ts)
276}
277
278/// Determines which upgrade to run for the `version` and executes it.
279///
280/// Returns the new version and upper.
281async fn run_upgrade(
282    unopened_catalog_state: &mut UnopenedPersistCatalogState,
283    version: u64,
284    commit_ts: Timestamp,
285) -> Result<(u64, Timestamp), CatalogError> {
286    let incompatible = DurableCatalogError::IncompatibleDataVersion {
287        found_version: version,
288        min_catalog_version: MIN_CATALOG_VERSION,
289        catalog_version: CATALOG_VERSION,
290    }
291    .into();
292
293    match version {
294        ..=TOO_OLD_VERSION => Err(incompatible),
295
296        67 => {
297            run_versioned_upgrade(
298                unopened_catalog_state,
299                version,
300                commit_ts,
301                v67_to_v68::upgrade,
302            )
303            .await
304        }
305        68 => {
306            run_versioned_upgrade(
307                unopened_catalog_state,
308                version,
309                commit_ts,
310                v68_to_v69::upgrade,
311            )
312            .await
313        }
314        69 => {
315            run_versioned_upgrade(
316                unopened_catalog_state,
317                version,
318                commit_ts,
319                v69_to_v70::upgrade,
320            )
321            .await
322        }
323        70 => {
324            run_versioned_upgrade(
325                unopened_catalog_state,
326                version,
327                commit_ts,
328                v70_to_v71::upgrade,
329            )
330            .await
331        }
332        71 => {
333            run_versioned_upgrade(
334                unopened_catalog_state,
335                version,
336                commit_ts,
337                v71_to_v72::upgrade,
338            )
339            .await
340        }
341        72 => {
342            run_versioned_upgrade(
343                unopened_catalog_state,
344                version,
345                commit_ts,
346                v72_to_v73::upgrade,
347            )
348            .await
349        }
350        73 => {
351            run_versioned_upgrade(
352                unopened_catalog_state,
353                version,
354                commit_ts,
355                v73_to_v74::upgrade,
356            )
357            .await
358        }
359        74 => {
360            run_versioned_upgrade(
361                unopened_catalog_state,
362                version,
363                commit_ts,
364                v74_to_v75::upgrade,
365            )
366            .await
367        }
368        75 => {
369            run_versioned_upgrade(
370                unopened_catalog_state,
371                version,
372                commit_ts,
373                v75_to_v76::upgrade,
374            )
375            .await
376        }
377        76 => {
378            run_versioned_upgrade(
379                unopened_catalog_state,
380                version,
381                commit_ts,
382                v76_to_v77::upgrade,
383            )
384            .await
385        }
386        77 => {
387            run_versioned_upgrade(
388                unopened_catalog_state,
389                version,
390                commit_ts,
391                v77_to_v78::upgrade,
392            )
393            .await
394        }
395
396        // Up-to-date, no migration needed!
397        CATALOG_VERSION => Ok((CATALOG_VERSION, commit_ts)),
398        FUTURE_VERSION.. => Err(incompatible),
399    }
400}
401
402/// Runs `migration_logic` on the contents of the current catalog assuming a current version of
403/// `current_version`.
404///
405/// Returns the new version and upper.
406async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson>(
407    unopened_catalog_state: &mut UnopenedPersistCatalogState,
408    current_version: u64,
409    mut commit_ts: Timestamp,
410    migration_logic: impl FnOnce(Vec<V1>) -> Vec<MigrationAction<V1, V2>>,
411) -> Result<(u64, Timestamp), CatalogError> {
412    tracing::info!(current_version, "running versioned Catalog upgrade");
413
414    // 1. Use the V1 to deserialize the contents of the current snapshot.
415    let snapshot: Vec<_> = unopened_catalog_state
416        .snapshot
417        .iter()
418        .map(|(kind, ts, diff)| {
419            soft_assert_eq_or_log!(
420                *diff,
421                Diff::ONE,
422                "snapshot is consolidated, ({kind:?}, {ts:?}, {diff:?})"
423            );
424            V1::try_from(kind.clone()).expect("invalid catalog data persisted")
425        })
426        .collect();
427
428    // 2. Generate updates from version specific migration logic.
429    let migration_actions = migration_logic(snapshot);
430    let mut updates: Vec<_> = migration_actions
431        .into_iter()
432        .flat_map(|action| action.into_updates().into_iter())
433        .collect();
434    // Validate that we're not migrating an un-migratable collection.
435    for (update, _) in &updates {
436        if update.is_always_deserializable() {
437            panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}");
438        }
439    }
440
441    // 3. Add a retraction for old version and insertion for new version into updates.
442    let next_version = current_version + 1;
443    let version_retraction = (version_update_kind(current_version), Diff::MINUS_ONE);
444    updates.push(version_retraction);
445    let version_insertion = (version_update_kind(next_version), Diff::ONE);
446    updates.push(version_insertion);
447
448    // 4. Apply migration to catalog.
449    if matches!(unopened_catalog_state.mode, Mode::Writable) {
450        commit_ts = unopened_catalog_state
451            .compare_and_append(updates, commit_ts)
452            .await
453            .map_err(|e| e.unwrap_fence_error())?;
454    } else {
455        let ts = commit_ts;
456        let updates = updates
457            .into_iter()
458            .map(|(kind, diff)| StateUpdate { kind, ts, diff });
459        commit_ts = commit_ts.step_forward();
460        unopened_catalog_state.apply_updates(updates)?;
461    }
462
463    // 5. Consolidate snapshot to remove old versions.
464    unopened_catalog_state.consolidate();
465
466    Ok((next_version, commit_ts))
467}
468
469/// Generates a [`proto::StateUpdateKind`] to update the user version.
470fn version_update_kind(version: u64) -> StateUpdateKindJson {
471    // We can use the current version because Configs can never be migrated and are always wire
472    // compatible.
473    StateUpdateKind::Config(
474        proto::ConfigKey {
475            key: USER_VERSION_KEY.to_string(),
476        },
477        proto::ConfigValue { value: version },
478    )
479    .into()
480}