mz_catalog/durable/upgrade.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains all the helpers and code paths for upgrading/migrating the `Catalog`.
11//!
12//! We facilitate migrations by keeping snapshots of the objects we previously stored, and relying
13//! entirely on these snapshots. These snapshots exist in the [`mz_catalog_protos`] crate in the
14//! form of `catalog-protos/protos/objects_vXX.proto`. By maintaining and relying on snapshots we
15//! don't have to worry about changes elsewhere in the codebase effecting our migrations because
16//! our application and serialization logic is decoupled, and the objects of the Catalog for a
17//! given version are "frozen in time".
18//!
19//! > **Note**: The protobuf snapshot files themselves live in a separate crate because it takes a
20//! relatively significant amount of time to codegen and build them. By placing them in
21//! a separate crate we don't have to pay this compile time cost when building the
22//! catalog, allowing for faster iteration.
23//!
24//! You cannot make any changes to the following message or anything that they depend on:
25//!
26//! - Config
27//! - Setting
28//! - FenceToken
29//! - AuditLog
30//!
31//! When you want to make a change to the `Catalog` you need to follow these steps:
32//!
33//! 1. Check the current [`CATALOG_VERSION`], make sure an `objects_v<CATALOG_VERSION>.proto` file
34//! exists. If one doesn't, copy and paste the current `objects.proto` file, renaming it to
35//! `objects_v<CATALOG_VERSION>.proto`.
36//! 2. Bump [`CATALOG_VERSION`] by one.
37//! 3. Make your changes to `objects.proto`.
38//! 4. Copy and paste `objects.proto`, naming the copy `objects_v<CATALOG_VERSION>.proto`. Update
39//! the package name of the `.proto` to `package objects_v<CATALOG_VERSION>;`
40//! 5. We should now have a copy of the protobuf objects as they currently exist, and a copy of
41//! how we want them to exist. For example, if the version of the Catalog before we made our
42//! changes was 15, we should now have `objects_v15.proto` and `objects_v16.proto`.
43//! 6. Rebuild Materialize which will error because the hashes stored in
44//! `src/catalog-protos/protos/hashes.json` have now changed. Update these to match the new
45//! hashes for objects.proto and `objects_v<CATALOG_VERSION>.proto`.
46//! 7. Add `v<CATALOG_VERSION>` to the call to the `objects!` macro in this file
47//! 8. Add a new file to `catalog/src/durable/upgrade` which is where we'll put the new migration
48//! path.
49//! 9. Write upgrade functions using the two versions of the protos we now have, e.g.
50//! `objects_v15.proto` and `objects_v16.proto`. In this migration code you __should not__
51//! import any defaults or constants from elsewhere in the codebase, because then a future
52//! change could then impact a previous migration.
53//! 10. Add an import for your new module to this file: mod v<CATALOG_VERSION-1>_to_v<CATALOG_VERSION>;
54//! 11. Call your upgrade function in [`run_upgrade()`].
55//! 12. Generate a test file for the new version:
56//! ```ignore
57//! cargo test --package mz-catalog --lib durable::upgrade::tests::generate_missing_encodings -- --ignored
58//! ```
59//!
60//! When in doubt, reach out to the Surfaces team, and we'll be more than happy to help :)
61
62pub mod json_compatible;
63#[cfg(test)]
64mod tests;
65
66use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
67use mz_repr::Diff;
68use paste::paste;
69#[cfg(test)]
70use proptest::prelude::*;
71#[cfg(test)]
72use proptest::strategy::ValueTree;
73#[cfg(test)]
74use proptest_derive::Arbitrary;
75use timely::progress::Timestamp as TimelyTimestamp;
76
77use crate::durable::initialize::USER_VERSION_KEY;
78use crate::durable::objects::serialization::proto;
79use crate::durable::objects::state_update::{
80 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
81};
82use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
83use crate::durable::{CatalogError, DurableCatalogError};
84
85#[cfg(test)]
86const ENCODED_TEST_CASES: usize = 100;
87
88/// Generate per-version support code.
89///
90/// Here we have to deal with the fact that the pre-v79 objects had a protobuf-generated format,
91/// which gives them additional levels of indirection that the post-v79 objects don't have and thus
92/// requires slightly different code to be generated.
93macro_rules! objects {
94 ( [$( $x_old:ident ),*], [$( $x:ident ),*] ) => {
95 paste! {
96 $(
97 pub(crate) mod [<objects_ $x_old>] {
98 pub use mz_catalog_protos::[<objects_ $x_old>]::*;
99
100 use crate::durable::objects::state_update::StateUpdateKindJson;
101
102 impl From<StateUpdateKind> for StateUpdateKindJson {
103 fn from(value: StateUpdateKind) -> Self {
104 let kind = value.kind.expect("kind should be set");
105 // TODO: This requires that the json->proto->json roundtrips
106 // exactly, see database-issues#7179.
107 StateUpdateKindJson::from_serde(&kind)
108 }
109 }
110
111 impl From<StateUpdateKindJson> for StateUpdateKind {
112 fn from(value: StateUpdateKindJson) -> Self {
113 let kind: state_update_kind::Kind = value.to_serde();
114 StateUpdateKind { kind: Some(kind) }
115 }
116 }
117 }
118 )*
119
120 $(
121 pub(crate) mod [<objects_ $x>] {
122 pub use mz_catalog_protos::[<objects_ $x>]::*;
123
124 use crate::durable::objects::state_update::StateUpdateKindJson;
125
126 impl From<StateUpdateKind> for StateUpdateKindJson {
127 fn from(value: StateUpdateKind) -> Self {
128 // TODO: This requires that the json->proto->json roundtrips
129 // exactly, see database-issues#7179.
130 StateUpdateKindJson::from_serde(&value)
131 }
132 }
133
134 impl From<StateUpdateKindJson> for StateUpdateKind {
135 fn from(value: StateUpdateKindJson) -> Self {
136 value.to_serde()
137 }
138 }
139 }
140 )*
141
142 // Generate test helpers for each version.
143
144 #[cfg(test)]
145 #[derive(Debug, Arbitrary)]
146 enum AllVersionsStateUpdateKind {
147 $(
148 [<$x_old:upper>](crate::durable::upgrade::[<objects_ $x_old>]::StateUpdateKind),
149 )*
150 $(
151 [<$x:upper>](crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind),
152 )*
153 }
154
155 #[cfg(test)]
156 impl AllVersionsStateUpdateKind {
157 #[cfg(test)]
158 fn arbitrary_vec(version: &str) -> Result<Vec<Self>, String> {
159 let mut runner = proptest::test_runner::TestRunner::deterministic();
160 std::iter::repeat(())
161 .filter_map(|_| {
162 AllVersionsStateUpdateKind::arbitrary(version, &mut runner)
163 .transpose()
164 })
165 .take(ENCODED_TEST_CASES)
166 .collect::<Result<_, _>>()
167 }
168
169 #[cfg(test)]
170 fn arbitrary(
171 version: &str,
172 runner: &mut proptest::test_runner::TestRunner,
173 ) -> Result<Option<Self>, String> {
174 match version {
175 $(
176 concat!("objects_", stringify!($x_old)) => {
177 let arbitrary_data =
178 crate::durable::upgrade
179 ::[<objects_ $x_old>]::StateUpdateKind::arbitrary()
180 .new_tree(runner)
181 .expect("unable to create arbitrary data")
182 .current();
183 // Skip over generated data where kind is None
184 // because they are not interesting or possible in
185 // production. Unfortunately any of the inner fields
186 // can still be None, which is also not possible in
187 // production.
188 // TODO(jkosh44) See if there's an arbitrary config
189 // that forces Some.
190 let arbitrary_data = if arbitrary_data.kind.is_some() {
191 Some(Self::[<$x_old:upper>](arbitrary_data))
192 } else {
193 None
194 };
195 Ok(arbitrary_data)
196 }
197 )*
198 $(
199 concat!("objects_", stringify!($x)) => {
200 let arbitrary_data =
201 crate::durable::upgrade
202 ::[<objects_ $x>]::StateUpdateKind::arbitrary()
203 .new_tree(runner)
204 .expect("unable to create arbitrary data")
205 .current();
206 Ok(Some(Self::[<$x:upper>](arbitrary_data)))
207 }
208 )*
209 _ => Err(format!("unrecognized version {version} add enum variant")),
210 }
211 }
212
213 #[cfg(test)]
214 fn try_from_raw(version: &str, raw: StateUpdateKindJson) -> Result<Self, String> {
215 match version {
216 $(
217 concat!("objects_", stringify!($x_old)) => Ok(Self::[<$x_old:upper>](raw.into())),
218 )*
219 $(
220 concat!("objects_", stringify!($x)) => Ok(Self::[<$x:upper>](raw.into())),
221
222 )*
223 _ => Err(format!("unrecognized version {version} add enum variant")),
224 }
225 }
226
227 #[cfg(test)]
228 fn raw(self) -> StateUpdateKindJson {
229 match self {
230 $(
231 Self::[<$x_old:upper>](kind) => kind.into(),
232 )*
233 $(
234 Self::[<$x:upper>](kind) => kind.into(),
235 )*
236 }
237 }
238 }
239 }
240 }
241}
242
243objects!(
244 [v74, v75, v76, v77, v78],
245 [v79, v80, v81, v82, v83, v84, v85]
246);
247
248/// The current version of the `Catalog`.
249pub use mz_catalog_protos::CATALOG_VERSION;
250/// The minimum `Catalog` version number that we support migrating from.
251pub use mz_catalog_protos::MIN_CATALOG_VERSION;
252
253// Note(parkmycar): Ideally we wouldn't have to define these extra constants,
254// but const expressions aren't yet supported in match statements.
255const TOO_OLD_VERSION: u64 = MIN_CATALOG_VERSION - 1;
256const FUTURE_VERSION: u64 = CATALOG_VERSION + 1;
257
258mod v74_to_v75;
259mod v75_to_v76;
260mod v76_to_v77;
261mod v77_to_v78;
262mod v78_to_v79;
263mod v79_to_v80;
264mod v80_to_v81;
265mod v81_to_v82;
266mod v82_to_v83;
267mod v83_to_v84;
268mod v84_to_v85;
269
270/// Describes a single action to take during a migration from `V1` to `V2`.
271#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
272enum MigrationAction<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> {
273 /// Deletes the provided key.
274 #[allow(unused)]
275 Delete(V1),
276 /// Inserts the provided key-value pair. The key must not currently exist!
277 #[allow(unused)]
278 Insert(V2),
279 /// Update the key-value pair for the provided key.
280 #[allow(unused)]
281 Update(V1, V2),
282}
283
284impl<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> MigrationAction<V1, V2> {
285 /// Converts `self` into a `Vec<StateUpdate<StateUpdateKindBinary>>` that can be appended
286 /// to persist.
287 fn into_updates(self) -> Vec<(StateUpdateKindJson, Diff)> {
288 match self {
289 MigrationAction::Delete(kind) => {
290 vec![(kind.into(), Diff::MINUS_ONE)]
291 }
292 MigrationAction::Insert(kind) => {
293 vec![(kind.into(), Diff::ONE)]
294 }
295 MigrationAction::Update(old_kind, new_kind) => {
296 vec![
297 (old_kind.into(), Diff::MINUS_ONE),
298 (new_kind.into(), Diff::ONE),
299 ]
300 }
301 }
302 }
303}
304
305/// Upgrades the data in the catalog to version [`CATALOG_VERSION`].
306///
307/// Returns the current upper after all migrations have executed.
308#[mz_ore::instrument(name = "persist::upgrade", level = "debug")]
309pub(crate) async fn upgrade(
310 persist_handle: &mut UnopenedPersistCatalogState,
311 mut commit_ts: Timestamp,
312) -> Result<Timestamp, CatalogError> {
313 soft_assert_ne_or_log!(
314 persist_handle.upper,
315 Timestamp::minimum(),
316 "cannot upgrade uninitialized catalog"
317 );
318
319 // Consolidate to avoid migrating old state.
320 persist_handle.consolidate();
321 let mut version = persist_handle
322 .get_user_version()
323 .await?
324 .expect("initialized catalog must have a version");
325 // Run migrations until we're up-to-date.
326 while version < CATALOG_VERSION {
327 (version, commit_ts) = run_upgrade(persist_handle, version, commit_ts).await?;
328 }
329
330 Ok(commit_ts)
331}
332
333/// Determines which upgrade to run for the `version` and executes it.
334///
335/// Returns the new version and upper.
336async fn run_upgrade(
337 unopened_catalog_state: &mut UnopenedPersistCatalogState,
338 version: u64,
339 commit_ts: Timestamp,
340) -> Result<(u64, Timestamp), CatalogError> {
341 let incompatible = DurableCatalogError::IncompatibleDataVersion {
342 found_version: version,
343 min_catalog_version: MIN_CATALOG_VERSION,
344 catalog_version: CATALOG_VERSION,
345 }
346 .into();
347
348 match version {
349 ..=TOO_OLD_VERSION => Err(incompatible),
350
351 74 => {
352 run_versioned_upgrade(
353 unopened_catalog_state,
354 version,
355 commit_ts,
356 v74_to_v75::upgrade,
357 )
358 .await
359 }
360 75 => {
361 run_versioned_upgrade(
362 unopened_catalog_state,
363 version,
364 commit_ts,
365 v75_to_v76::upgrade,
366 )
367 .await
368 }
369 76 => {
370 run_versioned_upgrade(
371 unopened_catalog_state,
372 version,
373 commit_ts,
374 v76_to_v77::upgrade,
375 )
376 .await
377 }
378 77 => {
379 run_versioned_upgrade(
380 unopened_catalog_state,
381 version,
382 commit_ts,
383 v77_to_v78::upgrade,
384 )
385 .await
386 }
387 78 => {
388 run_versioned_upgrade(
389 unopened_catalog_state,
390 version,
391 commit_ts,
392 v78_to_v79::upgrade,
393 )
394 .await
395 }
396 79 => {
397 run_versioned_upgrade(
398 unopened_catalog_state,
399 version,
400 commit_ts,
401 v79_to_v80::upgrade,
402 )
403 .await
404 }
405 80 => {
406 run_versioned_upgrade(
407 unopened_catalog_state,
408 version,
409 commit_ts,
410 v80_to_v81::upgrade,
411 )
412 .await
413 }
414 81 => {
415 run_versioned_upgrade(
416 unopened_catalog_state,
417 version,
418 commit_ts,
419 v81_to_v82::upgrade,
420 )
421 .await
422 }
423 // v82→v83 is a one-shot byte-level repair, not a proto evolution. It
424 // needs raw access to the snapshot's diffs (which `run_versioned_upgrade`
425 // strips) so it plugs into `run_upgrade` directly.
426 82 => v82_to_v83::upgrade(unopened_catalog_state, commit_ts).await,
427 83 => v83_to_v84::upgrade(unopened_catalog_state, commit_ts).await,
428 84 => {
429 run_versioned_upgrade(
430 unopened_catalog_state,
431 version,
432 commit_ts,
433 v84_to_v85::upgrade,
434 )
435 .await
436 }
437 // Up-to-date, no migration needed!
438 CATALOG_VERSION => Ok((CATALOG_VERSION, commit_ts)),
439 FUTURE_VERSION.. => Err(incompatible),
440 }
441}
442
443/// Runs `migration_logic` on the contents of the current catalog assuming a current version of
444/// `current_version`.
445///
446/// Returns the new version and upper.
447async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson>(
448 unopened_catalog_state: &mut UnopenedPersistCatalogState,
449 current_version: u64,
450 mut commit_ts: Timestamp,
451 migration_logic: impl FnOnce(Vec<V1>) -> Vec<MigrationAction<V1, V2>>,
452) -> Result<(u64, Timestamp), CatalogError> {
453 tracing::info!(current_version, "running versioned Catalog upgrade");
454
455 // 1. Use the V1 to deserialize the contents of the current snapshot.
456 let snapshot: Vec<_> = unopened_catalog_state
457 .snapshot
458 .iter()
459 .map(|(kind, ts, diff)| {
460 soft_assert_eq_or_log!(
461 *diff,
462 Diff::ONE,
463 "snapshot is consolidated, ({kind:?}, {ts:?}, {diff:?})"
464 );
465 V1::try_from(kind.clone()).expect("invalid catalog data persisted")
466 })
467 .collect();
468
469 // 2. Generate updates from version specific migration logic.
470 let migration_actions = migration_logic(snapshot);
471 let mut updates: Vec<_> = migration_actions
472 .into_iter()
473 .flat_map(|action| action.into_updates().into_iter())
474 .collect();
475 // Validate that we're not migrating an un-migratable collection.
476 for (update, _) in &updates {
477 if update.is_always_deserializable() {
478 panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}");
479 }
480 }
481
482 // 3. Add a retraction for old version and insertion for new version into updates.
483 let next_version = current_version + 1;
484 let version_retraction = (version_update_kind(current_version), Diff::MINUS_ONE);
485 updates.push(version_retraction);
486 let version_insertion = (version_update_kind(next_version), Diff::ONE);
487 updates.push(version_insertion);
488
489 // 4. Apply migration to catalog.
490 if matches!(unopened_catalog_state.mode, Mode::Writable) {
491 commit_ts = unopened_catalog_state
492 .compare_and_append(updates, commit_ts)
493 .await
494 .map_err(|e| e.unwrap_fence_error())?;
495 } else {
496 let ts = commit_ts;
497 let updates = updates
498 .into_iter()
499 .map(|(kind, diff)| StateUpdate { kind, ts, diff });
500 commit_ts = commit_ts.step_forward();
501 unopened_catalog_state.apply_updates_and_consolidate(updates)?;
502 }
503
504 // 5. Consolidate snapshot to remove old versions.
505 unopened_catalog_state.consolidate();
506
507 Ok((next_version, commit_ts))
508}
509
510/// Generates a [`proto::StateUpdateKind`] to update the user version.
511fn version_update_kind(version: u64) -> StateUpdateKindJson {
512 // We can use the current version because Configs can never be migrated and are always wire
513 // compatible.
514 StateUpdateKind::Config(
515 proto::ConfigKey {
516 key: USER_VERSION_KEY.to_string(),
517 },
518 proto::ConfigValue { value: version },
519 )
520 .into()
521}