mz_catalog/durable/
upgrade.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains all the helpers and code paths for upgrading/migrating the `Catalog`.
11//!
12//! We facilitate migrations by keeping snapshots of the objects we previously stored, and relying
13//! entirely on these snapshots. These snapshots exist in the [`mz_catalog_protos`] crate in the
14//! form of `catalog-protos/protos/objects_vXX.proto`. By maintaining and relying on snapshots we
15//! don't have to worry about changes elsewhere in the codebase effecting our migrations because
16//! our application and serialization logic is decoupled, and the objects of the Catalog for a
17//! given version are "frozen in time".
18//!
19//! > **Note**: The protobuf snapshot files themselves live in a separate crate because it takes a
20//!             relatively significant amount of time to codegen and build them. By placing them in
21//!             a separate crate we don't have to pay this compile time cost when building the
22//!             catalog, allowing for faster iteration.
23//!
24//! You cannot make any changes to the following message or anything that they depend on:
25//!
26//!   - Config
27//!   - Setting
28//!   - FenceToken
29//!   - AuditLog
30//!
31//! When you want to make a change to the `Catalog` you need to follow these steps:
32//!
33//! 1. Check the current [`CATALOG_VERSION`], make sure an `objects_v<CATALOG_VERSION>.proto` file
34//!    exists. If one doesn't, copy and paste the current `objects.proto` file, renaming it to
35//!    `objects_v<CATALOG_VERSION>.proto`.
36//! 2. Bump [`CATALOG_VERSION`] by one.
37//! 3. Make your changes to `objects.proto`.
38//! 4. Copy and paste `objects.proto`, naming the copy `objects_v<CATALOG_VERSION>.proto`. Update
39//!    the package name of the `.proto` to `package objects_v<CATALOG_VERSION>;`
40//! 5. We should now have a copy of the protobuf objects as they currently exist, and a copy of
41//!    how we want them to exist. For example, if the version of the Catalog before we made our
42//!    changes was 15, we should now have `objects_v15.proto` and `objects_v16.proto`.
43//! 6. Rebuild Materialize which will error because the hashes stored in
44//!    `src/catalog-protos/protos/hashes.json` have now changed. Update these to match the new
45//!    hashes for objects.proto and `objects_v<CATALOG_VERSION>.proto`.
46//! 7. Add `v<CATALOG_VERSION>` to the call to the `objects!` macro in this file, and to the
47//!    `proto_objects!` macro in the [`mz_catalog_protos`] crate.
48//! 8. Add a new file to `catalog/src/durable/upgrade` which is where we'll put the new migration
49//!    path.
50//! 9. Write upgrade functions using the two versions of the protos we now have, e.g.
51//!    `objects_v15.proto` and `objects_v16.proto`. In this migration code you __should not__
52//!    import any defaults or constants from elsewhere in the codebase, because then a future
53//!    change could then impact a previous migration.
54//! 10. Add an import for your new module to this file: mod v<CATALOG_VERSION-1>_to_v<CATALOG_VERSION>;
55//! 11. Call your upgrade function in [`run_upgrade()`].
56//! 12. Generate a test file for the new version:
57//!     ```ignore
58//!     cargo test --package mz-catalog --lib durable::upgrade::tests::generate_missing_encodings -- --ignored
59//!     ```
60//!
61//! When in doubt, reach out to the Surfaces team, and we'll be more than happy to help :)
62
63pub mod json_compatible;
64#[cfg(test)]
65mod tests;
66
67use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
68use mz_repr::Diff;
69use paste::paste;
70#[cfg(test)]
71use proptest::prelude::*;
72#[cfg(test)]
73use proptest::strategy::ValueTree;
74#[cfg(test)]
75use proptest_derive::Arbitrary;
76use timely::progress::Timestamp as TimelyTimestamp;
77
78use crate::durable::initialize::USER_VERSION_KEY;
79use crate::durable::objects::serialization::proto;
80use crate::durable::objects::state_update::{
81    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
82};
83use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
84use crate::durable::{CatalogError, DurableCatalogError};
85
86#[cfg(test)]
87const ENCODED_TEST_CASES: usize = 100;
88
89/// Generate per-version support code.
90///
91/// Here we have to deal with the fact that the pre-v79 objects had a protobuf-generated format,
92/// which gives them additional levels of indirection that the post-v79 objects don't have and thus
93/// requires slightly different code to be generated.
94macro_rules! objects {
95    ( [$( $x_old:ident ),*], [$( $x:ident ),*] ) => {
96        paste! {
97            $(
98                pub(crate) mod [<objects_ $x_old>] {
99                    pub use mz_catalog_protos::[<objects_ $x_old>]::*;
100
101                    use crate::durable::objects::state_update::StateUpdateKindJson;
102
103                    impl From<StateUpdateKind> for StateUpdateKindJson {
104                        fn from(value: StateUpdateKind) -> Self {
105                            let kind = value.kind.expect("kind should be set");
106                            // TODO: This requires that the json->proto->json roundtrips
107                            // exactly, see database-issues#7179.
108                            StateUpdateKindJson::from_serde(&kind)
109                        }
110                    }
111
112                    impl From<StateUpdateKindJson> for StateUpdateKind {
113                        fn from(value: StateUpdateKindJson) -> Self {
114                            let kind: state_update_kind::Kind = value.to_serde();
115                            StateUpdateKind { kind: Some(kind) }
116                        }
117                    }
118                }
119            )*
120
121            $(
122                pub(crate) mod [<objects_ $x>] {
123                    pub use mz_catalog_protos::[<objects_ $x>]::*;
124
125                    use crate::durable::objects::state_update::StateUpdateKindJson;
126
127                    impl From<StateUpdateKind> for StateUpdateKindJson {
128                        fn from(value: StateUpdateKind) -> Self {
129                            // TODO: This requires that the json->proto->json roundtrips
130                            // exactly, see database-issues#7179.
131                            StateUpdateKindJson::from_serde(&value)
132                        }
133                    }
134
135                    impl From<StateUpdateKindJson> for StateUpdateKind {
136                        fn from(value: StateUpdateKindJson) -> Self {
137                            value.to_serde()
138                        }
139                    }
140                }
141            )*
142
143            // Generate test helpers for each version.
144
145            #[cfg(test)]
146            #[derive(Debug, Arbitrary)]
147            enum AllVersionsStateUpdateKind {
148                $(
149                    [<$x_old:upper>](crate::durable::upgrade::[<objects_ $x_old>]::StateUpdateKind),
150                )*
151                $(
152                    [<$x:upper>](crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind),
153                )*
154            }
155
156            #[cfg(test)]
157            impl AllVersionsStateUpdateKind {
158                #[cfg(test)]
159                fn arbitrary_vec(version: &str) -> Result<Vec<Self>, String> {
160                    let mut runner = proptest::test_runner::TestRunner::deterministic();
161                    std::iter::repeat(())
162                        .filter_map(|_| AllVersionsStateUpdateKind::arbitrary(version, &mut runner).transpose())
163                        .take(ENCODED_TEST_CASES)
164                        .collect::<Result<_, _>>()
165                }
166
167                #[cfg(test)]
168                fn arbitrary(
169                    version: &str,
170                    runner: &mut proptest::test_runner::TestRunner,
171                ) -> Result<Option<Self>, String> {
172                    match version {
173                        $(
174                            concat!("objects_", stringify!($x_old)) => {
175                                let arbitrary_data =
176                                    crate::durable::upgrade::[<objects_ $x_old>]::StateUpdateKind::arbitrary()
177                                        .new_tree(runner)
178                                        .expect("unable to create arbitrary data")
179                                        .current();
180                                // Skip over generated data where kind is None because they are not interesting or
181                                // possible in production. Unfortunately any of the inner fields can still be None,
182                                // which is also not possible in production.
183                                // TODO(jkosh44) See if there's an arbitrary config that forces Some.
184                                let arbitrary_data = if arbitrary_data.kind.is_some() {
185                                    Some(Self::[<$x_old:upper>](arbitrary_data))
186                                } else {
187                                    None
188                                };
189                                Ok(arbitrary_data)
190                            }
191                        )*
192                        $(
193                            concat!("objects_", stringify!($x)) => {
194                                let arbitrary_data =
195                                    crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind::arbitrary()
196                                        .new_tree(runner)
197                                        .expect("unable to create arbitrary data")
198                                        .current();
199                                Ok(Some(Self::[<$x:upper>](arbitrary_data)))
200                            }
201                        )*
202                        _ => Err(format!("unrecognized version {version} add enum variant")),
203                    }
204                }
205
206                #[cfg(test)]
207                fn try_from_raw(version: &str, raw: StateUpdateKindJson) -> Result<Self, String> {
208                    match version {
209                        $(
210                            concat!("objects_", stringify!($x_old)) => Ok(Self::[<$x_old:upper>](raw.into())),
211                        )*
212                        $(
213                            concat!("objects_", stringify!($x)) => Ok(Self::[<$x:upper>](raw.into())),
214
215                        )*
216                        _ => Err(format!("unrecognized version {version} add enum variant")),
217                    }
218                }
219
220                #[cfg(test)]
221                fn raw(self) -> StateUpdateKindJson {
222                    match self {
223                        $(
224                            Self::[<$x_old:upper>](kind) => kind.into(),
225                        )*
226                        $(
227                            Self::[<$x:upper>](kind) => kind.into(),
228                        )*
229                    }
230                }
231            }
232        }
233    }
234}
235
236objects!([v74, v75, v76, v77, v78], [v79, v80]);
237
238/// The current version of the `Catalog`.
239pub use mz_catalog_protos::CATALOG_VERSION;
240/// The minimum `Catalog` version number that we support migrating from.
241pub use mz_catalog_protos::MIN_CATALOG_VERSION;
242
243// Note(parkmycar): Ideally we wouldn't have to define these extra constants,
244// but const expressions aren't yet supported in match statements.
245const TOO_OLD_VERSION: u64 = MIN_CATALOG_VERSION - 1;
246const FUTURE_VERSION: u64 = CATALOG_VERSION + 1;
247
248mod v74_to_v75;
249mod v75_to_v76;
250mod v76_to_v77;
251mod v77_to_v78;
252mod v78_to_v79;
253mod v79_to_v80;
254
255/// Describes a single action to take during a migration from `V1` to `V2`.
256#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
257enum MigrationAction<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> {
258    /// Deletes the provided key.
259    #[allow(unused)]
260    Delete(V1),
261    /// Inserts the provided key-value pair. The key must not currently exist!
262    #[allow(unused)]
263    Insert(V2),
264    /// Update the key-value pair for the provided key.
265    #[allow(unused)]
266    Update(V1, V2),
267}
268
269impl<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> MigrationAction<V1, V2> {
270    /// Converts `self` into a `Vec<StateUpdate<StateUpdateKindBinary>>` that can be appended
271    /// to persist.
272    fn into_updates(self) -> Vec<(StateUpdateKindJson, Diff)> {
273        match self {
274            MigrationAction::Delete(kind) => {
275                vec![(kind.into(), Diff::MINUS_ONE)]
276            }
277            MigrationAction::Insert(kind) => {
278                vec![(kind.into(), Diff::ONE)]
279            }
280            MigrationAction::Update(old_kind, new_kind) => {
281                vec![
282                    (old_kind.into(), Diff::MINUS_ONE),
283                    (new_kind.into(), Diff::ONE),
284                ]
285            }
286        }
287    }
288}
289
290/// Upgrades the data in the catalog to version [`CATALOG_VERSION`].
291///
292/// Returns the current upper after all migrations have executed.
293#[mz_ore::instrument(name = "persist::upgrade", level = "debug")]
294pub(crate) async fn upgrade(
295    persist_handle: &mut UnopenedPersistCatalogState,
296    mut commit_ts: Timestamp,
297) -> Result<Timestamp, CatalogError> {
298    soft_assert_ne_or_log!(
299        persist_handle.upper,
300        Timestamp::minimum(),
301        "cannot upgrade uninitialized catalog"
302    );
303
304    // Consolidate to avoid migrating old state.
305    persist_handle.consolidate();
306    let mut version = persist_handle
307        .get_user_version()
308        .await?
309        .expect("initialized catalog must have a version");
310    // Run migrations until we're up-to-date.
311    while version < CATALOG_VERSION {
312        (version, commit_ts) = run_upgrade(persist_handle, version, commit_ts).await?;
313    }
314
315    Ok(commit_ts)
316}
317
318/// Determines which upgrade to run for the `version` and executes it.
319///
320/// Returns the new version and upper.
321async fn run_upgrade(
322    unopened_catalog_state: &mut UnopenedPersistCatalogState,
323    version: u64,
324    commit_ts: Timestamp,
325) -> Result<(u64, Timestamp), CatalogError> {
326    let incompatible = DurableCatalogError::IncompatibleDataVersion {
327        found_version: version,
328        min_catalog_version: MIN_CATALOG_VERSION,
329        catalog_version: CATALOG_VERSION,
330    }
331    .into();
332
333    match version {
334        ..=TOO_OLD_VERSION => Err(incompatible),
335
336        74 => {
337            run_versioned_upgrade(
338                unopened_catalog_state,
339                version,
340                commit_ts,
341                v74_to_v75::upgrade,
342            )
343            .await
344        }
345        75 => {
346            run_versioned_upgrade(
347                unopened_catalog_state,
348                version,
349                commit_ts,
350                v75_to_v76::upgrade,
351            )
352            .await
353        }
354        76 => {
355            run_versioned_upgrade(
356                unopened_catalog_state,
357                version,
358                commit_ts,
359                v76_to_v77::upgrade,
360            )
361            .await
362        }
363        77 => {
364            run_versioned_upgrade(
365                unopened_catalog_state,
366                version,
367                commit_ts,
368                v77_to_v78::upgrade,
369            )
370            .await
371        }
372        78 => {
373            run_versioned_upgrade(
374                unopened_catalog_state,
375                version,
376                commit_ts,
377                v78_to_v79::upgrade,
378            )
379            .await
380        }
381        79 => {
382            run_versioned_upgrade(
383                unopened_catalog_state,
384                version,
385                commit_ts,
386                v79_to_v80::upgrade,
387            )
388            .await
389        }
390
391        // Up-to-date, no migration needed!
392        CATALOG_VERSION => Ok((CATALOG_VERSION, commit_ts)),
393        FUTURE_VERSION.. => Err(incompatible),
394    }
395}
396
397/// Runs `migration_logic` on the contents of the current catalog assuming a current version of
398/// `current_version`.
399///
400/// Returns the new version and upper.
401async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson>(
402    unopened_catalog_state: &mut UnopenedPersistCatalogState,
403    current_version: u64,
404    mut commit_ts: Timestamp,
405    migration_logic: impl FnOnce(Vec<V1>) -> Vec<MigrationAction<V1, V2>>,
406) -> Result<(u64, Timestamp), CatalogError> {
407    tracing::info!(current_version, "running versioned Catalog upgrade");
408
409    // 1. Use the V1 to deserialize the contents of the current snapshot.
410    let snapshot: Vec<_> = unopened_catalog_state
411        .snapshot
412        .iter()
413        .map(|(kind, ts, diff)| {
414            soft_assert_eq_or_log!(
415                *diff,
416                Diff::ONE,
417                "snapshot is consolidated, ({kind:?}, {ts:?}, {diff:?})"
418            );
419            V1::try_from(kind.clone()).expect("invalid catalog data persisted")
420        })
421        .collect();
422
423    // 2. Generate updates from version specific migration logic.
424    let migration_actions = migration_logic(snapshot);
425    let mut updates: Vec<_> = migration_actions
426        .into_iter()
427        .flat_map(|action| action.into_updates().into_iter())
428        .collect();
429    // Validate that we're not migrating an un-migratable collection.
430    for (update, _) in &updates {
431        if update.is_always_deserializable() {
432            panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}");
433        }
434    }
435
436    // 3. Add a retraction for old version and insertion for new version into updates.
437    let next_version = current_version + 1;
438    let version_retraction = (version_update_kind(current_version), Diff::MINUS_ONE);
439    updates.push(version_retraction);
440    let version_insertion = (version_update_kind(next_version), Diff::ONE);
441    updates.push(version_insertion);
442
443    // 4. Apply migration to catalog.
444    if matches!(unopened_catalog_state.mode, Mode::Writable) {
445        commit_ts = unopened_catalog_state
446            .compare_and_append(updates, commit_ts)
447            .await
448            .map_err(|e| e.unwrap_fence_error())?;
449    } else {
450        let ts = commit_ts;
451        let updates = updates
452            .into_iter()
453            .map(|(kind, diff)| StateUpdate { kind, ts, diff });
454        commit_ts = commit_ts.step_forward();
455        unopened_catalog_state.apply_updates(updates)?;
456    }
457
458    // 5. Consolidate snapshot to remove old versions.
459    unopened_catalog_state.consolidate();
460
461    Ok((next_version, commit_ts))
462}
463
464/// Generates a [`proto::StateUpdateKind`] to update the user version.
465fn version_update_kind(version: u64) -> StateUpdateKindJson {
466    // We can use the current version because Configs can never be migrated and are always wire
467    // compatible.
468    StateUpdateKind::Config(
469        proto::ConfigKey {
470            key: USER_VERSION_KEY.to_string(),
471        },
472        proto::ConfigValue { value: version },
473    )
474    .into()
475}