mz_catalog/durable/upgrade.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains all the helpers and code paths for upgrading/migrating the `Catalog`.
11//!
12//! We facilitate migrations by keeping snapshots of the objects we previously stored, and relying
13//! entirely on these snapshots. These snapshots exist in the [`mz_catalog_protos`] crate in the
14//! form of `catalog-protos/protos/objects_vXX.proto`. By maintaining and relying on snapshots we
15//! don't have to worry about changes elsewhere in the codebase effecting our migrations because
16//! our application and serialization logic is decoupled, and the objects of the Catalog for a
17//! given version are "frozen in time".
18//!
19//! > **Note**: The protobuf snapshot files themselves live in a separate crate because it takes a
20//! relatively significant amount of time to codegen and build them. By placing them in
21//! a separate crate we don't have to pay this compile time cost when building the
22//! catalog, allowing for faster iteration.
23//!
24//! You cannot make any changes to the following message or anything that they depend on:
25//!
26//! - Config
27//! - Setting
28//! - FenceToken
29//! - AuditLog
30//!
31//! When you want to make a change to the `Catalog` you need to follow these steps:
32//!
33//! 1. Check the current [`CATALOG_VERSION`], make sure an `objects_v<CATALOG_VERSION>.proto` file
34//! exists. If one doesn't, copy and paste the current `objects.proto` file, renaming it to
35//! `objects_v<CATALOG_VERSION>.proto`.
36//! 2. Bump [`CATALOG_VERSION`] by one.
37//! 3. Make your changes to `objects.proto`.
38//! 4. Copy and paste `objects.proto`, naming the copy `objects_v<CATALOG_VERSION>.proto`. Update
39//! the package name of the `.proto` to `package objects_v<CATALOG_VERSION>;`
40//! 5. We should now have a copy of the protobuf objects as they currently exist, and a copy of
41//! how we want them to exist. For example, if the version of the Catalog before we made our
42//! changes was 15, we should now have `objects_v15.proto` and `objects_v16.proto`.
43//! 6. Rebuild Materialize which will error because the hashes stored in
44//! `src/catalog-protos/protos/hashes.json` have now changed. Update these to match the new
45//! hashes for objects.proto and `objects_v<CATALOG_VERSION>.proto`.
46//! 7. Add `v<CATALOG_VERSION>` to the call to the `objects!` macro in this file, and to the
47//! `proto_objects!` macro in the [`mz_catalog_protos`] crate.
48//! 8. Add a new file to `catalog/src/durable/upgrade` which is where we'll put the new migration
49//! path.
50//! 9. Write upgrade functions using the two versions of the protos we now have, e.g.
51//! `objects_v15.proto` and `objects_v16.proto`. In this migration code you __should not__
52//! import any defaults or constants from elsewhere in the codebase, because then a future
53//! change could then impact a previous migration.
54//! 10. Add an import for your new module to this file: mod v<CATALOG_VERSION-1>_to_v<CATALOG_VERSION>;
55//! 11. Call your upgrade function in [`run_upgrade()`].
56//! 12. Generate a test file for the new version:
57//! ```ignore
58//! cargo test --package mz-catalog --lib durable::upgrade::tests::generate_missing_encodings -- --ignored
59//! ```
60//!
61//! When in doubt, reach out to the Surfaces team, and we'll be more than happy to help :)
62
63#[cfg(test)]
64mod tests;
65pub mod wire_compatible;
66
67use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
68use mz_repr::Diff;
69use paste::paste;
70#[cfg(test)]
71use proptest::prelude::*;
72#[cfg(test)]
73use proptest::strategy::ValueTree;
74#[cfg(test)]
75use proptest_derive::Arbitrary;
76use timely::progress::Timestamp as TimelyTimestamp;
77
78use crate::durable::initialize::USER_VERSION_KEY;
79use crate::durable::objects::serialization::proto;
80use crate::durable::objects::state_update::{
81 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
82};
83use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
84use crate::durable::{CatalogError, DurableCatalogError};
85
86#[cfg(test)]
87const ENCODED_TEST_CASES: usize = 100;
88
89macro_rules! objects {
90 ( $( $x:ident ),* ) => {
91 paste! {
92 $(
93 pub(crate) mod [<objects_ $x>] {
94 pub use mz_catalog_protos::[<objects_ $x>]::*;
95
96 use crate::durable::objects::state_update::StateUpdateKindJson;
97
98 impl From<StateUpdateKind> for StateUpdateKindJson {
99 fn from(value: StateUpdateKind) -> Self {
100 let kind = value.kind.expect("kind should be set");
101 // TODO: This requires that the json->proto->json roundtrips
102 // exactly, see database-issues#7179.
103 StateUpdateKindJson::from_serde(&kind)
104 }
105 }
106
107 impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
108 type Error = String;
109
110 fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
111 let kind: state_update_kind::Kind = value.to_serde();
112 Ok(StateUpdateKind { kind: Some(kind) })
113 }
114 }
115 }
116 )*
117
118 // Generate test helpers for each version.
119
120 #[cfg(test)]
121 #[derive(Debug, Arbitrary)]
122 enum AllVersionsStateUpdateKind {
123 $(
124 [<$x:upper>](crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind),
125 )*
126 }
127
128 #[cfg(test)]
129 impl AllVersionsStateUpdateKind {
130 #[cfg(test)]
131 fn arbitrary_vec(version: &str) -> Result<Vec<Self>, String> {
132 let mut runner = proptest::test_runner::TestRunner::deterministic();
133 std::iter::repeat(())
134 .filter_map(|_| AllVersionsStateUpdateKind::arbitrary(version, &mut runner).transpose())
135 .take(ENCODED_TEST_CASES)
136 .collect::<Result<_, _>>()
137 }
138
139 #[cfg(test)]
140 fn arbitrary(
141 version: &str,
142 runner: &mut proptest::test_runner::TestRunner,
143 ) -> Result<Option<Self>, String> {
144 match version {
145 $(
146 concat!("objects_", stringify!($x)) => {
147 let arbitrary_data =
148 crate::durable::upgrade::[<objects_ $x>]::StateUpdateKind::arbitrary()
149 .new_tree(runner)
150 .expect("unable to create arbitrary data")
151 .current();
152 // Skip over generated data where kind is None because they are not interesting or
153 // possible in production. Unfortunately any of the inner fields can still be None,
154 // which is also not possible in production.
155 // TODO(jkosh44) See if there's an arbitrary config that forces Some.
156 let arbitrary_data = if arbitrary_data.kind.is_some() {
157 Some(Self::[<$x:upper>](arbitrary_data))
158 } else {
159 None
160 };
161 Ok(arbitrary_data)
162 }
163 )*
164 _ => Err(format!("unrecognized version {version} add enum variant")),
165 }
166 }
167
168 #[cfg(test)]
169 fn try_from_raw(version: &str, raw: StateUpdateKindJson) -> Result<Self, String> {
170 match version {
171 $(
172 concat!("objects_", stringify!($x)) => Ok(Self::[<$x:upper>](raw.try_into()?)),
173 )*
174 _ => Err(format!("unrecognized version {version} add enum variant")),
175 }
176 }
177
178 #[cfg(test)]
179 fn raw(self) -> StateUpdateKindJson {
180 match self {
181 $(
182 Self::[<$x:upper>](kind) => kind.into(),
183 )*
184 }
185 }
186 }
187 }
188 }
189}
190
191objects!(v67, v68, v69, v70, v71, v72, v73, v74, v75, v76, v77);
192
193/// The current version of the `Catalog`.
194pub use mz_catalog_protos::CATALOG_VERSION;
195/// The minimum `Catalog` version number that we support migrating from.
196pub use mz_catalog_protos::MIN_CATALOG_VERSION;
197
198// Note(parkmycar): Ideally we wouldn't have to define these extra constants,
199// but const expressions aren't yet supported in match statements.
200const TOO_OLD_VERSION: u64 = MIN_CATALOG_VERSION - 1;
201const FUTURE_VERSION: u64 = CATALOG_VERSION + 1;
202
203mod v67_to_v68;
204mod v68_to_v69;
205mod v69_to_v70;
206mod v70_to_v71;
207mod v71_to_v72;
208mod v72_to_v73;
209mod v73_to_v74;
210mod v74_to_v75;
211mod v75_to_v76;
212mod v76_to_v77;
213
214/// Describes a single action to take during a migration from `V1` to `V2`.
215#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
216enum MigrationAction<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> {
217 /// Deletes the provided key.
218 #[allow(unused)]
219 Delete(V1),
220 /// Inserts the provided key-value pair. The key must not currently exist!
221 #[allow(unused)]
222 Insert(V2),
223 /// Update the key-value pair for the provided key.
224 #[allow(unused)]
225 Update(V1, V2),
226}
227
228impl<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson> MigrationAction<V1, V2> {
229 /// Converts `self` into a `Vec<StateUpdate<StateUpdateKindBinary>>` that can be appended
230 /// to persist.
231 fn into_updates(self) -> Vec<(StateUpdateKindJson, Diff)> {
232 match self {
233 MigrationAction::Delete(kind) => {
234 vec![(kind.into(), Diff::MINUS_ONE)]
235 }
236 MigrationAction::Insert(kind) => {
237 vec![(kind.into(), Diff::ONE)]
238 }
239 MigrationAction::Update(old_kind, new_kind) => {
240 vec![
241 (old_kind.into(), Diff::MINUS_ONE),
242 (new_kind.into(), Diff::ONE),
243 ]
244 }
245 }
246 }
247}
248
249/// Upgrades the data in the catalog to version [`CATALOG_VERSION`].
250///
251/// Returns the current upper after all migrations have executed.
252#[mz_ore::instrument(name = "persist::upgrade", level = "debug")]
253pub(crate) async fn upgrade(
254 persist_handle: &mut UnopenedPersistCatalogState,
255 mut commit_ts: Timestamp,
256) -> Result<Timestamp, CatalogError> {
257 soft_assert_ne_or_log!(
258 persist_handle.upper,
259 Timestamp::minimum(),
260 "cannot upgrade uninitialized catalog"
261 );
262
263 // Consolidate to avoid migrating old state.
264 persist_handle.consolidate();
265 let mut version = persist_handle
266 .get_user_version()
267 .await?
268 .expect("initialized catalog must have a version");
269 // Run migrations until we're up-to-date.
270 while version < CATALOG_VERSION {
271 (version, commit_ts) = run_upgrade(persist_handle, version, commit_ts).await?;
272 }
273
274 Ok(commit_ts)
275}
276
277/// Determines which upgrade to run for the `version` and executes it.
278///
279/// Returns the new version and upper.
280async fn run_upgrade(
281 unopened_catalog_state: &mut UnopenedPersistCatalogState,
282 version: u64,
283 commit_ts: Timestamp,
284) -> Result<(u64, Timestamp), CatalogError> {
285 let incompatible = DurableCatalogError::IncompatibleDataVersion {
286 found_version: version,
287 min_catalog_version: MIN_CATALOG_VERSION,
288 catalog_version: CATALOG_VERSION,
289 }
290 .into();
291
292 match version {
293 ..=TOO_OLD_VERSION => Err(incompatible),
294
295 67 => {
296 run_versioned_upgrade(
297 unopened_catalog_state,
298 version,
299 commit_ts,
300 v67_to_v68::upgrade,
301 )
302 .await
303 }
304 68 => {
305 run_versioned_upgrade(
306 unopened_catalog_state,
307 version,
308 commit_ts,
309 v68_to_v69::upgrade,
310 )
311 .await
312 }
313 69 => {
314 run_versioned_upgrade(
315 unopened_catalog_state,
316 version,
317 commit_ts,
318 v69_to_v70::upgrade,
319 )
320 .await
321 }
322 70 => {
323 run_versioned_upgrade(
324 unopened_catalog_state,
325 version,
326 commit_ts,
327 v70_to_v71::upgrade,
328 )
329 .await
330 }
331 71 => {
332 run_versioned_upgrade(
333 unopened_catalog_state,
334 version,
335 commit_ts,
336 v71_to_v72::upgrade,
337 )
338 .await
339 }
340 72 => {
341 run_versioned_upgrade(
342 unopened_catalog_state,
343 version,
344 commit_ts,
345 v72_to_v73::upgrade,
346 )
347 .await
348 }
349 73 => {
350 run_versioned_upgrade(
351 unopened_catalog_state,
352 version,
353 commit_ts,
354 v73_to_v74::upgrade,
355 )
356 .await
357 }
358 74 => {
359 run_versioned_upgrade(
360 unopened_catalog_state,
361 version,
362 commit_ts,
363 v74_to_v75::upgrade,
364 )
365 .await
366 }
367 75 => {
368 run_versioned_upgrade(
369 unopened_catalog_state,
370 version,
371 commit_ts,
372 v75_to_v76::upgrade,
373 )
374 .await
375 }
376 76 => {
377 run_versioned_upgrade(
378 unopened_catalog_state,
379 version,
380 commit_ts,
381 v76_to_v77::upgrade,
382 )
383 .await
384 }
385
386 // Up-to-date, no migration needed!
387 CATALOG_VERSION => Ok((CATALOG_VERSION, commit_ts)),
388 FUTURE_VERSION.. => Err(incompatible),
389 }
390}
391
392/// Runs `migration_logic` on the contents of the current catalog assuming a current version of
393/// `current_version`.
394///
395/// Returns the new version and upper.
396async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateKindJson>(
397 unopened_catalog_state: &mut UnopenedPersistCatalogState,
398 current_version: u64,
399 mut commit_ts: Timestamp,
400 migration_logic: impl FnOnce(Vec<V1>) -> Vec<MigrationAction<V1, V2>>,
401) -> Result<(u64, Timestamp), CatalogError> {
402 tracing::info!(current_version, "running versioned Catalog upgrade");
403
404 // 1. Use the V1 to deserialize the contents of the current snapshot.
405 let snapshot: Vec<_> = unopened_catalog_state
406 .snapshot
407 .iter()
408 .map(|(kind, ts, diff)| {
409 soft_assert_eq_or_log!(
410 *diff,
411 Diff::ONE,
412 "snapshot is consolidated, ({kind:?}, {ts:?}, {diff:?})"
413 );
414 V1::try_from(kind.clone()).expect("invalid catalog data persisted")
415 })
416 .collect();
417
418 // 2. Generate updates from version specific migration logic.
419 let migration_actions = migration_logic(snapshot);
420 let mut updates: Vec<_> = migration_actions
421 .into_iter()
422 .flat_map(|action| action.into_updates().into_iter())
423 .collect();
424 // Validate that we're not migrating an un-migratable collection.
425 for (update, _) in &updates {
426 if update.is_always_deserializable() {
427 panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}");
428 }
429 }
430
431 // 3. Add a retraction for old version and insertion for new version into updates.
432 let next_version = current_version + 1;
433 let version_retraction = (version_update_kind(current_version), Diff::MINUS_ONE);
434 updates.push(version_retraction);
435 let version_insertion = (version_update_kind(next_version), Diff::ONE);
436 updates.push(version_insertion);
437
438 // 4. Apply migration to catalog.
439 if matches!(unopened_catalog_state.mode, Mode::Writable) {
440 commit_ts = unopened_catalog_state
441 .compare_and_append(updates, commit_ts)
442 .await
443 .map_err(|e| e.unwrap_fence_error())?;
444 } else {
445 let ts = commit_ts;
446 let updates = updates
447 .into_iter()
448 .map(|(kind, diff)| StateUpdate { kind, ts, diff });
449 commit_ts = commit_ts.step_forward();
450 unopened_catalog_state.apply_updates(updates)?;
451 }
452
453 // 5. Consolidate snapshot to remove old versions.
454 unopened_catalog_state.consolidate();
455
456 Ok((next_version, commit_ts))
457}
458
459/// Generates a [`proto::StateUpdateKind`] to update the user version.
460fn version_update_kind(version: u64) -> StateUpdateKindJson {
461 // We can use the current version because Configs can never be migrated and are always wire
462 // compatible.
463 StateUpdateKind::Config(
464 proto::ConfigKey {
465 key: USER_VERSION_KEY.to_string(),
466 },
467 proto::ConfigValue { value: version },
468 )
469 .into()
470}