mz_catalog/durable/upgrade.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains all the helpers and code paths for upgrading/migrating the `Catalog`.
11//!
12//! We facilitate migrations by keeping snapshots of the objects we previously stored, and relying
13//! entirely on these snapshots. These snapshots exist in the [`mz_catalog_protos`] crate in the
14//! form of `catalog-protos/protos/objects_vXX.proto`. By maintaining and relying on snapshots we
15//! don't have to worry about changes elsewhere in the codebase effecting our migrations because
16//! our application and serialization logic is decoupled, and the objects of the Catalog for a
17//! given version are "frozen in time".
18//!
19//! > **Note**: The protobuf snapshot files themselves live in a separate crate because it takes a
20//! relatively significant amount of time to codegen and build them. By placing them in
21//! a separate crate we don't have to pay this compile time cost when building the
22//! catalog, allowing for faster iteration.
23//!
24//! You cannot make any changes to the following message or anything that they depend on:
25//!
26//! - Config
27//! - Setting
28//! - FenceToken
29//! - AuditLog
30//!
31//! When you want to make a change to the `Catalog` you need to follow these steps:
32//!
33//! 1. Check the current [`CATALOG_VERSION`], make sure an `objects_v<CATALOG_VERSION>.proto` file
34//! exists. If one doesn't, copy and paste the current `objects.proto` file, renaming it to
35//! `objects_v<CATALOG_VERSION>.proto`.
36//! 2. Bump [`CATALOG_VERSION`] by one.
37//! 3. Make your changes to `objects.proto`.
38//! 4. Copy and paste `objects.proto`, naming the copy `objects_v<CATALOG_VERSION>.proto`. Update
39//! the package name of the `.proto` to `package objects_v<CATALOG_VERSION>;`
40//! 5. We should now have a copy of the protobuf objects as they currently exist, and a copy of
41//! how we want them to exist. For example, if the version of the Catalog before we made our
42//! changes was 15, we should now have `objects_v15.proto` and `objects_v16.proto`.
43//! 6. Rebuild Materialize which will error because the hashes stored in
44//! `src/catalog-protos/protos/hashes.json` have now changed. Update these to match the new
45//! hashes for objects.proto and `objects_v<CATALOG_VERSION>.proto`.
46//! 7. Add `v<CATALOG_VERSION>` to the call to the `objects!` macro in this file, and to the
47//! `proto_objects!` macro in the [`mz_catalog_protos`] crate.
48//! 8. Add a new file to `catalog/src/durable/upgrade` which is where we'll put the new migration
49//! path.
50//! 9. Write upgrade functions using the two versions of the protos we now have, e.g.
51//! `objects_v15.proto` and `objects_v16.proto`. In this migration code you __should not__
52//! import any defaults or constants from elsewhere in the codebase, because then a future
53//! change could then impact a previous migration.
54//! 10. Add an import for your new module to this file: mod v<CATALOG_VERSION-1>_to_v<CATALOG_VERSION>;
55//! 11. Call your upgrade function in [`run_upgrade()`].
56//! 12. Generate a test file for the new version:
57//! ```ignore
58//! cargo test --package mz-catalog --lib durable::upgrade::tests::generate_missing_encodings -- --ignored
59//! ```
60//!
61//! When in doubt, reach out to the Surfaces team, and we'll be more than happy to help :)
62
63pub mod json_compatible;
64#[cfg(test)]
65mod tests;
66
67use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
68use mz_repr::Diff;
69use paste::paste;
70#[cfg(test)]
71use proptest::prelude::*;
72#[cfg(test)]
73use proptest::strategy::ValueTree;
74#[cfg(test)]
75use proptest_derive::Arbitrary;
76use timely::progress::Timestamp as TimelyTimestamp;
77
78use crate::durable::initialize::USER_VERSION_KEY;
79use crate::durable::objects::serialization::proto;
80use crate::durable::objects::state_update::{
81 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
82};
83use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
84use crate::durable::{CatalogError, DurableCatalogError};
85
86#[cfg(test)]
87const ENCODED_TEST_CASES: usize = 100;
88
89/// Generate per-version support code.
90///
91/// Here we have to deal with the fact that the pre-v79 objects had a protobuf-generated format,
92/// which gives them additional levels of indirection that the post-v79 objects don't have and thus
93/// requires slightly different code to be generated.
94macro_rules! objects {
95 ( [$( $x_old:ident ),*], [$( $x:ident ),*] ) => {
96 paste! {
97 $(
98 pub(crate) mod [<objects_ $x_old>] {
99 pub use mz_catalog_protos::[<objects_ $x_old>]::*;
100
101 use crate::durable::objects::state_update::StateUpdateKindJson;
102
103 impl From<StateUpdateKind> for StateUpdateKindJson {
104 fn from(value: StateUpdateKind) -> Self {
105 let kind = value.kind.expect("kind should be set");
106 // TODO: This requires that the json->proto->json roundtrips
107 // exactly, see database-issues#7179.
108 StateUpdateKindJson::from_serde(&kind)
109 }
110 }
111
112 impl From<StateUpdateKindJson> for StateUpdateKind {
113 fn from(value: StateUpdateKindJson) -> Self {
114 let kind: state_update_kind::Kind = value.to_serde();
115 StateUpdateKind { kind: Some(kind) }
116 }
117 }
118 }
119 )*
120
121 $(
122 pub(crate) mod [<objects_ $x>] {
123 pub use mz_catalog_protos::[<objects_ $x>]::*;
124
125 use crate::durable::objects::state_update::StateUpdateKindJson;
126
127 impl From<StateUpdateKind> for StateUpdateKindJson {
128 fn from(value: StateUpdateKind) -> Self {
129 // TODO: This requires that the json->proto->json roundtrips
130 // exactly, see database-issues#7179.
131 StateUpdateKindJson::from_serde(&value)
132 }
133 }
134
135 impl From<StateUpdateKindJson> for StateUpdateKind {
136 fn from(value: StateUpdateKindJson) -> Self {
137 value.to_serde()
138 }
139 }
140 }
141 )*
142
143 // Generate test helpers for each version.
144
145 #[cfg(test)]
146 #[derive(Debug, Arbitrary)]
147 enum AllVersionsStateUpdateKind {
148 $(
149 [<$x_old:upper>](crate::durable::upgrade::[<objects_ $x_old>]::StateUpdateKind),
150 )*
151 $(
152 [<$x:upper>](crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind),
153 )*
154 }
155
156 #[cfg(test)]
157 impl AllVersionsStateUpdateKind {
158 #[cfg(test)]
159 fn arbitrary_vec(version: &str) -> Result<Vec<Self>, String> {
160 let mut runner = proptest::test_runner::TestRunner::deterministic();
161 std::iter::repeat(())
162 .filter_map(|_| {
163 AllVersionsStateUpdateKind::arbitrary(version, &mut runner)
164 .transpose()
165 })
166 .take(ENCODED_TEST_CASES)
167 .collect::<Result<_, _>>()
168 }
169
170 #[cfg(test)]
171 fn arbitrary(
172 version: &str,
173 runner: &mut proptest::test_runner::TestRunner,
174 ) -> Result<Option<Self>, String> {
175 match version {
176 $(
177 concat!("objects_", stringify!($x_old)) => {
178 let arbitrary_data =
179 crate::durable::upgrade
180 ::[<objects_ $x_old>]::StateUpdateKind::arbitrary()
181 .new_tree(runner)
182 .expect("unable to create arbitrary data")
183 .current();
184 // Skip over generated data where kind is None
185 // because they are not interesting or possible in
186 // production. Unfortunately any of the inner fields
187 // can still be None, which is also not possible in
188 // production.
189 // TODO(jkosh44) See if there's an arbitrary config
190 // that forces Some.
191 let arbitrary_data = if arbitrary_data.kind.is_some() {
192 Some(Self::[<$x_old:upper>](arbitrary_data))
193 } else {
194 None
195 };
196 Ok(arbitrary_data)
197 }
198 )*
199 $(
200 concat!("objects_", stringify!($x)) => {
201 let arbitrary_data =
202 crate::durable::upgrade
203 ::[<objects_ $x>]::StateUpdateKind::arbitrary()
204 .new_tree(runner)
205 .expect("unable to create arbitrary data")
206 .current();
207 Ok(Some(Self::[<$x:upper>](arbitrary_data)))
208 }
209 )*
210 _ => Err(format!("unrecognized version {version} add enum variant")),
211 }
212 }
213
214 #[cfg(test)]
215 fn try_from_raw(version: &str, raw: StateUpdateKindJson) -> Result<Self, String> {
216 match version {
217 $(
218 concat!("objects_", stringify!($x_old)) => Ok(Self::[<$x_old:upper>](raw.into())),
219 )*
220 $(
221 concat!("objects_", stringify!($x)) => Ok(Self::[<$x:upper>](raw.into())),
222
223 )*
224 _ => Err(format!("unrecognized version {version} add enum variant")),
225 }
226 }
227
228 #[cfg(test)]
229 fn raw(self) -> StateUpdateKindJson {
230 match self {
231 $(
232 Self::[<$x_old:upper>](kind) => kind.into(),
233 )*
234 $(
235 Self::[<$x:upper>](kind) => kind.into(),
236 )*
237 }
238 }
239 }
240 }
241 }
242}
243
244objects!([v74, v75, v76, v77, v78], [v79, v80]);
245
246/// The current version of the `Catalog`.
247pub use mz_catalog_protos::CATALOG_VERSION;
248/// The minimum `Catalog` version number that we support migrating from.
249pub use mz_catalog_protos::MIN_CATALOG_VERSION;
250
251// Note(parkmycar): Ideally we wouldn't have to define these extra constants,
252// but const expressions aren't yet supported in match statements.
253const TOO_OLD_VERSION: u64 = MIN_CATALOG_VERSION - 1;
254const FUTURE_VERSION: u64 = CATALOG_VERSION + 1;
255
256mod v74_to_v75;
257mod v75_to_v76;
258mod v76_to_v77;
259mod v77_to_v78;
260mod v78_to_v79;
261mod v79_to_v80;
262
263/// Describes a single action to take during a migration from `V1` to `V2`.
264#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
265enum MigrationAction<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> {
266 /// Deletes the provided key.
267 #[allow(unused)]
268 Delete(V1),
269 /// Inserts the provided key-value pair. The key must not currently exist!
270 #[allow(unused)]
271 Insert(V2),
272 /// Update the key-value pair for the provided key.
273 #[allow(unused)]
274 Update(V1, V2),
275}
276
277impl<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> MigrationAction<V1, V2> {
278 /// Converts `self` into a `Vec<StateUpdate<StateUpdateKindBinary>>` that can be appended
279 /// to persist.
280 fn into_updates(self) -> Vec<(StateUpdateKindJson, Diff)> {
281 match self {
282 MigrationAction::Delete(kind) => {
283 vec![(kind.into(), Diff::MINUS_ONE)]
284 }
285 MigrationAction::Insert(kind) => {
286 vec![(kind.into(), Diff::ONE)]
287 }
288 MigrationAction::Update(old_kind, new_kind) => {
289 vec![
290 (old_kind.into(), Diff::MINUS_ONE),
291 (new_kind.into(), Diff::ONE),
292 ]
293 }
294 }
295 }
296}
297
298/// Upgrades the data in the catalog to version [`CATALOG_VERSION`].
299///
300/// Returns the current upper after all migrations have executed.
301#[mz_ore::instrument(name = "persist::upgrade", level = "debug")]
302pub(crate) async fn upgrade(
303 persist_handle: &mut UnopenedPersistCatalogState,
304 mut commit_ts: Timestamp,
305) -> Result<Timestamp, CatalogError> {
306 soft_assert_ne_or_log!(
307 persist_handle.upper,
308 Timestamp::minimum(),
309 "cannot upgrade uninitialized catalog"
310 );
311
312 // Consolidate to avoid migrating old state.
313 persist_handle.consolidate();
314 let mut version = persist_handle
315 .get_user_version()
316 .await?
317 .expect("initialized catalog must have a version");
318 // Run migrations until we're up-to-date.
319 while version < CATALOG_VERSION {
320 (version, commit_ts) = run_upgrade(persist_handle, version, commit_ts).await?;
321 }
322
323 Ok(commit_ts)
324}
325
326/// Determines which upgrade to run for the `version` and executes it.
327///
328/// Returns the new version and upper.
329async fn run_upgrade(
330 unopened_catalog_state: &mut UnopenedPersistCatalogState,
331 version: u64,
332 commit_ts: Timestamp,
333) -> Result<(u64, Timestamp), CatalogError> {
334 let incompatible = DurableCatalogError::IncompatibleDataVersion {
335 found_version: version,
336 min_catalog_version: MIN_CATALOG_VERSION,
337 catalog_version: CATALOG_VERSION,
338 }
339 .into();
340
341 match version {
342 ..=TOO_OLD_VERSION => Err(incompatible),
343
344 74 => {
345 run_versioned_upgrade(
346 unopened_catalog_state,
347 version,
348 commit_ts,
349 v74_to_v75::upgrade,
350 )
351 .await
352 }
353 75 => {
354 run_versioned_upgrade(
355 unopened_catalog_state,
356 version,
357 commit_ts,
358 v75_to_v76::upgrade,
359 )
360 .await
361 }
362 76 => {
363 run_versioned_upgrade(
364 unopened_catalog_state,
365 version,
366 commit_ts,
367 v76_to_v77::upgrade,
368 )
369 .await
370 }
371 77 => {
372 run_versioned_upgrade(
373 unopened_catalog_state,
374 version,
375 commit_ts,
376 v77_to_v78::upgrade,
377 )
378 .await
379 }
380 78 => {
381 run_versioned_upgrade(
382 unopened_catalog_state,
383 version,
384 commit_ts,
385 v78_to_v79::upgrade,
386 )
387 .await
388 }
389 79 => {
390 run_versioned_upgrade(
391 unopened_catalog_state,
392 version,
393 commit_ts,
394 v79_to_v80::upgrade,
395 )
396 .await
397 }
398
399 // Up-to-date, no migration needed!
400 CATALOG_VERSION => Ok((CATALOG_VERSION, commit_ts)),
401 FUTURE_VERSION.. => Err(incompatible),
402 }
403}
404
405/// Runs `migration_logic` on the contents of the current catalog assuming a current version of
406/// `current_version`.
407///
408/// Returns the new version and upper.
409async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson>(
410 unopened_catalog_state: &mut UnopenedPersistCatalogState,
411 current_version: u64,
412 mut commit_ts: Timestamp,
413 migration_logic: impl FnOnce(Vec<V1>) -> Vec<MigrationAction<V1, V2>>,
414) -> Result<(u64, Timestamp), CatalogError> {
415 tracing::info!(current_version, "running versioned Catalog upgrade");
416
417 // 1. Use the V1 to deserialize the contents of the current snapshot.
418 let snapshot: Vec<_> = unopened_catalog_state
419 .snapshot
420 .iter()
421 .map(|(kind, ts, diff)| {
422 soft_assert_eq_or_log!(
423 *diff,
424 Diff::ONE,
425 "snapshot is consolidated, ({kind:?}, {ts:?}, {diff:?})"
426 );
427 V1::try_from(kind.clone()).expect("invalid catalog data persisted")
428 })
429 .collect();
430
431 // 2. Generate updates from version specific migration logic.
432 let migration_actions = migration_logic(snapshot);
433 let mut updates: Vec<_> = migration_actions
434 .into_iter()
435 .flat_map(|action| action.into_updates().into_iter())
436 .collect();
437 // Validate that we're not migrating an un-migratable collection.
438 for (update, _) in &updates {
439 if update.is_always_deserializable() {
440 panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}");
441 }
442 }
443
444 // 3. Add a retraction for old version and insertion for new version into updates.
445 let next_version = current_version + 1;
446 let version_retraction = (version_update_kind(current_version), Diff::MINUS_ONE);
447 updates.push(version_retraction);
448 let version_insertion = (version_update_kind(next_version), Diff::ONE);
449 updates.push(version_insertion);
450
451 // 4. Apply migration to catalog.
452 if matches!(unopened_catalog_state.mode, Mode::Writable) {
453 commit_ts = unopened_catalog_state
454 .compare_and_append(updates, commit_ts)
455 .await
456 .map_err(|e| e.unwrap_fence_error())?;
457 } else {
458 let ts = commit_ts;
459 let updates = updates
460 .into_iter()
461 .map(|(kind, diff)| StateUpdate { kind, ts, diff });
462 commit_ts = commit_ts.step_forward();
463 unopened_catalog_state.apply_updates(updates)?;
464 }
465
466 // 5. Consolidate snapshot to remove old versions.
467 unopened_catalog_state.consolidate();
468
469 Ok((next_version, commit_ts))
470}
471
472/// Generates a [`proto::StateUpdateKind`] to update the user version.
473fn version_update_kind(version: u64) -> StateUpdateKindJson {
474 // We can use the current version because Configs can never be migrated and are always wire
475 // compatible.
476 StateUpdateKind::Config(
477 proto::ConfigKey {
478 key: USER_VERSION_KEY.to_string(),
479 },
480 proto::ConfigValue { value: version },
481 )
482 .into()
483}