1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
// Copyright Materialize, Inc. and contributors. All rights reserved.
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::collections::BTreeMap;
use mz_catalog::builtin::BuiltinTable;
use mz_catalog::durable::{
use mz_catalog::memory::objects::StateUpdate;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NowFn;
use mz_persist_types::ShardId;
use mz_repr::{CatalogItemId, Timestamp};
use mz_sql::ast::display::AstDisplay;
use mz_sql_parser::ast::{Raw, Statement};
use semver::Version;
use tracing::info;
use uuid::Uuid;
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
use crate::catalog::open::into_consolidatable_updates_startup;
use crate::catalog::state::LocalExpressionCache;
use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
F: for<'a> FnMut(
&'a mut Transaction<'_>,
&'a mut Statement<Raw>,
) -> Result<(), anyhow::Error>,
let mut updated_items = BTreeMap::new();
for mut item in tx.get_items() {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
f(tx, item.id, &mut stmt)?;
item.create_sql = stmt.to_ast_string_stable();
updated_items.insert(item.id, item);
fn rewrite_items<F>(
tx: &mut Transaction<'_>,
cat: &ConnCatalog<'_>,
mut f: F,
) -> Result<(), anyhow::Error>
F: for<'a> FnMut(
&'a mut Transaction<'_>,
&'a &ConnCatalog<'_>,
&'a mut Statement<Raw>,
) -> Result<(), anyhow::Error>,
let mut updated_items = BTreeMap::new();
let items = tx.get_items();
for mut item in items {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
f(tx, &cat, item.id, &mut stmt)?;
item.create_sql = stmt.to_ast_string_stable();
updated_items.insert(item.id, item);
/// Migrates all user items and loads them into `state`.
/// Returns the builtin updates corresponding to all user items.
pub(crate) async fn migrate(
state: &mut CatalogState,
tx: &mut Transaction<'_>,
local_expr_cache: &mut LocalExpressionCache,
item_updates: Vec<StateUpdate>,
_now: NowFn,
_boot_ts: Timestamp,
) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, anyhow::Error> {
let catalog_version = tx.get_catalog_content_version();
let catalog_version = match catalog_version {
Some(v) => Version::parse(&v)?,
None => Version::new(0, 0, 0),
"migrating statements from catalog version {:?}",
rewrite_ast_items(tx, |_tx, _id, _stmt| {
// Add per-item AST migrations below.
// Each migration should be a function that takes `stmt` (the AST
// representing the creation SQL for the item) as input. Any
// mutations to `stmt` will be staged for commit to the catalog.
// Migration functions may also take `tx` as input to stage
// arbitrary changes to the catalog.
// Load items into catalog. We make sure to consolidate the old updates with the new updates to
// avoid trying to apply unmigrated items.
let commit_ts = tx.upper();
let mut item_updates = into_consolidatable_updates_startup(item_updates, commit_ts);
let op_item_updates = tx.get_and_commit_op_updates();
let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
let item_updates = item_updates
.map(|(kind, ts, diff)| StateUpdate {
kind: kind.into(),
diff: diff.try_into().expect("valid diff"),
let mut ast_builtin_table_updates = state
.apply_updates_for_bootstrap(item_updates, local_expr_cache)
info!("migrating from catalog version {:?}", catalog_version);
let conn_cat = state.for_system_session();
rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
let _catalog_version = catalog_version.clone();
// Add per-item, post-planning AST migrations below. Most
// migrations should be in the above `rewrite_ast_items` block.
// Each migration should be a function that takes `item` (the AST
// representing the creation SQL for the item) as input. Any
// mutations to `item` will be staged for commit to the catalog.
// Be careful if you reference `conn_cat`. Doing so is *weird*,
// as you'll be rewriting the catalog while looking at it. If
// possible, make your migration independent of `conn_cat`, and only
// consider a single item at a time.
// Migration functions may also take `tx` as input to stage
// arbitrary changes to the catalog.
// Add whole-catalog migrations below.
// Each migration should be a function that takes `tx` and `conn_cat` as
// input and stages arbitrary transformations to the catalog on `tx`.
let op_item_updates = tx.get_and_commit_op_updates();
let item_builtin_table_updates = state
.apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
"migration from catalog version {:?} complete",
// Add new migrations below their appropriate heading, and precede them with a
// short summary of the migration's purpose and optional additional commentary
// about safety or approach.
// The convention is to name the migration function using snake case:
// > <category>_<description>_<version>
// Please include the adapter team on any code reviews that add or edit
// migrations.
// Durable migrations
/// Migrations that run only on the durable catalog before any data is loaded into memory.
pub(crate) fn durable_migrate(
tx: &mut Transaction,
organization_id: Uuid,
_boot_ts: Timestamp,
) -> Result<(), anyhow::Error> {
// Insert the builtin migration shard into the settings collection.
if tx.get_builtin_migration_shard().is_none() {
let builtin_migration_shard = builtin_migration_shard_id(organization_id);
// Insert the expression cache shard into the settings collection.
if tx.get_expression_cache_shard().is_none() {
let expression_cache_shard = expression_cache_shard_id(organization_id);
/// Deterministically generate a builtin table migration shard ID for the given
/// `organization_id`.
fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, BUILTIN_MIGRATION_SEED)
/// Deterministically generate an expression cache shard ID for the given
/// `organization_id`.
pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, EXPRESSION_CACHE_SEED)
// Add new migrations below their appropriate heading, and precede them with a
// short summary of the migration's purpose and optional additional commentary
// about safety or approach.
// The convention is to name the migration function using snake case:
// > <category>_<description>_<version>
// Please include the adapter team on any code reviews that add or edit
// migrations.