use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::str::FromStr;
use futures::future::BoxFuture;
use mz_catalog::durable::{Item, Transaction};
use mz_catalog::memory::objects::{StateUpdate, StateUpdateKind};
use mz_ore::collections::CollectionExt;
use mz_ore::now::{EpochMillis, NowFn};
use mz_repr::{GlobalId, Timestamp};
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::visit_mut::VisitMut;
use mz_sql::ast::{CreateSubsourceOption, CreateSubsourceOptionName, Ident};
use mz_sql::catalog::SessionCatalog;
use mz_sql_parser::ast::{Raw, Statement};
use mz_storage_types::connections::ConnectionContext;
use semver::Version;
use tracing::info;
use crate::catalog::{CatalogState, ConnCatalog};
async fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow::Error>
where
F: for<'a> FnMut(
&'a mut Transaction<'_>,
GlobalId,
&'a mut Statement<Raw>,
) -> BoxFuture<'a, Result<(), anyhow::Error>>,
{
let mut updated_items = BTreeMap::new();
let items = tx.get_items();
for mut item in items {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
f(tx, item.id, &mut stmt).await?;
item.create_sql = stmt.to_ast_string_stable();
updated_items.insert(item.id, item);
}
tx.update_items(updated_items)?;
Ok(())
}
async fn rewrite_items<F>(
tx: &mut Transaction<'_>,
cat: &ConnCatalog<'_>,
mut f: F,
) -> Result<(), anyhow::Error>
where
F: for<'a> FnMut(
&'a mut Transaction<'_>,
&'a &ConnCatalog<'_>,
GlobalId,
&'a mut Statement<Raw>,
) -> BoxFuture<'a, Result<(), anyhow::Error>>,
{
let mut updated_items = BTreeMap::new();
let items = tx.get_items();
for mut item in items {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
f(tx, &cat, item.id, &mut stmt).await?;
item.create_sql = stmt.to_ast_string_stable();
updated_items.insert(item.id, item);
}
tx.update_items(updated_items)?;
Ok(())
}
pub(crate) async fn migrate(
state: &CatalogState,
tx: &mut Transaction<'_>,
now: NowFn,
_boot_ts: Timestamp,
_connection_context: &ConnectionContext,
) -> Result<(), anyhow::Error> {
let catalog_version = tx.get_catalog_content_version();
let catalog_version = match catalog_version {
Some(v) => Version::parse(&v)?,
None => Version::new(0, 0, 0),
};
info!(
"migrating statements from catalog version {:?}",
catalog_version
);
rewrite_ast_items(tx, |_tx, _id, stmt| {
let _catalog_version = catalog_version.clone();
Box::pin(async move {
ast_rewrite_create_source_loadgen_options_0_92_0(stmt)?;
Ok(())
})
})
.await?;
let mut state = state.clone();
let item_updates = tx
.get_items()
.map(|item| StateUpdate {
kind: StateUpdateKind::Item(item),
diff: 1,
})
.collect();
state.apply_updates_for_bootstrap(item_updates)?;
info!("migrating from catalog version {:?}", catalog_version);
let conn_cat = state.for_system_session();
rewrite_items(tx, &conn_cat, |_tx, conn_cat, _id, stmt| {
let _catalog_version = catalog_version.clone();
Box::pin(async move {
ast_rewrite_create_source_pg_database_details(conn_cat, stmt)?;
Ok(())
})
})
.await?;
subsource_rewrite_v0_98(tx, &conn_cat, now)?;
info!(
"migration from catalog version {:?} complete",
catalog_version
);
Ok(())
}
fn subsource_rewrite_v0_98(
tx: &mut Transaction<'_>,
conn_catalog: &ConnCatalog,
now: NowFn,
) -> Result<(), anyhow::Error> {
use mz_sql::ast::UnresolvedItemName;
use mz_sql::catalog::CatalogItemType;
use mz_sql_parser::ast::RawItemName;
use mz_storage_types::connections::Connection;
use mz_storage_types::sources::GenericSourceConnection;
let mut needs_new_id = vec![];
let mut source_ids_of_updated_subsources = BTreeSet::new();
let mut updated_items = BTreeMap::new();
let mut source_map: BTreeMap<_, _> = tx
.get_items()
.into_iter()
.filter(|item| conn_catalog.get_item(&item.id).item_type() == CatalogItemType::Source)
.map(|item| (item.id, item))
.collect();
for source in conn_catalog
.get_items()
.iter()
.filter(|item| item.item_type() == CatalogItemType::Source)
{
let mut exports = source.source_exports();
exports.retain(|id, _| *id != source.id());
if exports.is_empty() {
continue;
}
let desc = source
.source_desc()
.expect("item is source with desc")
.expect("item is source with desc");
let external_reference_tables: Vec<_> = match &desc.connection {
GenericSourceConnection::Postgres(pg) => {
let connection = conn_catalog.get_item(&pg.connection_id);
let conn = connection
.connection()
.expect("generic source connection has connection details");
let database = match conn {
Connection::Postgres(p) => p.database.clone(),
_ => unreachable!("PG sources must have PG connections"),
};
pg.publication_details
.tables
.iter()
.map(|t| {
UnresolvedItemName(vec![
Ident::new_unchecked(database.clone()),
Ident::new_unchecked(t.namespace.clone()),
Ident::new_unchecked(t.name.clone()),
])
})
.collect()
}
GenericSourceConnection::MySql(mysql) => mysql
.details
.tables
.iter()
.map(|t| {
UnresolvedItemName(vec![
Ident::new_unchecked("mysql"),
Ident::new_unchecked(t.schema_name.clone()),
Ident::new_unchecked(t.name.clone()),
])
})
.collect(),
GenericSourceConnection::LoadGenerator(load_generator) => {
let prefix = UnresolvedItemName(vec![
Ident::new_unchecked(
mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME,
),
Ident::new_unchecked(load_generator.load_generator.schema_name()),
]);
load_generator
.load_generator
.views()
.into_iter()
.map(|(view_name, _desc)| {
let mut name = prefix.clone();
name.0.push(Ident::new_unchecked(view_name.to_string()));
name
})
.collect()
}
GenericSourceConnection::Kafka(_) => {
unreachable!("Kafka sources have non-self source exports")
}
};
let primary_source_full_name = conn_catalog.resolve_full_name(source.name());
let primary_source_name = mz_sql::normalize::unresolve(primary_source_full_name);
for (export_id, output_idx) in exports {
let subsource_item = conn_catalog.get_item(&export_id);
let mut subsource_stmt =
mz_sql_parser::parser::parse_statements(subsource_item.create_sql())
.expect("parsing persisted create_sql must succeed")
.into_element()
.ast;
match &mut subsource_stmt {
Statement::CreateSubsource(create_subsource_stmt) => {
create_subsource_stmt.of_source =
Some(RawItemName::Name(primary_source_name.clone()));
create_subsource_stmt
.with_options
.retain(|o| o.name != CreateSubsourceOptionName::References);
create_subsource_stmt
.with_options
.push(CreateSubsourceOption {
name: CreateSubsourceOptionName::ExternalReference,
value: Some(mz_sql::ast::WithOptionValue::UnresolvedItemName(
external_reference_tables[output_idx - 1].clone(),
)),
})
}
_ => unreachable!("subsource items must correlate to subsources"),
}
let mut subsource_item = source_map.remove(&export_id).expect("item exists");
subsource_item.create_sql = subsource_stmt.to_ast_string_stable();
let present = updated_items.insert(export_id, subsource_item);
assert_eq!(present, None, "each export only updated a single time");
if export_id < source.id() {
tracing::info!(
"subsource {} has a GlobalId less than its primary source ({})",
export_id,
source.id()
);
needs_new_id.push(export_id);
source_ids_of_updated_subsources.insert(source.id());
}
}
let primary_source_id = source.id();
let mut primary_source_item = source_map
.remove(&primary_source_id)
.expect("source exists");
let mut primary_source_stmt =
mz_sql_parser::parser::parse_statements(&primary_source_item.create_sql)
.expect("parsing persisted create_sql must succeed")
.into_element()
.ast;
match &mut primary_source_stmt {
Statement::CreateSource(create_source_stmt) => {
create_source_stmt.referenced_subsources = None;
}
_ => unreachable!("subsource items must correlate to subsources"),
}
primary_source_item.create_sql = primary_source_stmt.to_ast_string_stable();
let present = updated_items.insert(primary_source_id, primary_source_item);
assert_eq!(present, None, "each source only updated a single time");
}
tx.update_items(updated_items)?;
let mut remaining_updates = VecDeque::from_iter(needs_new_id.drain(..));
while let Some(id) = remaining_updates.pop_front() {
needs_new_id.push(id);
remaining_updates.extend(
conn_catalog
.state()
.get_entry(&id)
.used_by()
.iter()
.filter(|id| !source_ids_of_updated_subsources.contains(id))
.cloned(),
);
}
let mut id_dedup = BTreeSet::new();
needs_new_id = needs_new_id
.into_iter()
.rev()
.filter(|id| id_dedup.insert(*id))
.rev()
.collect();
assign_new_user_global_ids(tx, conn_catalog, now, needs_new_id)
}
fn assign_new_user_global_ids(
tx: &mut Transaction<'_>,
conn_catalog: &ConnCatalog,
now: NowFn,
needs_new_id: Vec<GlobalId>,
) -> Result<(), anyhow::Error> {
use itertools::Itertools;
use mz_audit_log::{FromPreviousIdV1, ToNewIdV1};
use mz_ore::cast::CastFrom;
use mz_sql::names::CommentObjectId;
use mz_sql_parser::ast::RawItemName;
use mz_storage_client::controller::StorageTxn;
let news_new_id_set: BTreeSet<_> = needs_new_id
.iter()
.map(|id| {
assert!(id.is_user(), "cannot assign new ID to non-user ID {:?}", id);
id
})
.cloned()
.collect();
let mut update_items: BTreeMap<_, _> = tx
.get_items()
.into_iter()
.filter_map(|item| match news_new_id_set.contains(&item.id) {
true => Some((item.id, item)),
false => None,
})
.collect();
let new_ids = tx.allocate_user_item_ids(u64::cast_from(needs_new_id.len()))?;
tx.remove_items(news_new_id_set.clone())?;
let mut deleted_metadata: BTreeMap<_, _> = tx
.delete_collection_metadata(news_new_id_set)
.into_iter()
.collect();
let mut updated_storage_collection_metadata = BTreeMap::new();
let mut new_id_mapping = BTreeMap::new();
let occurred_at = now();
for (old_id, new_id) in needs_new_id.into_iter().zip_eq(new_ids.into_iter()) {
let item = update_items.remove(&old_id).expect("known to be an entry");
new_id_mapping.insert(item.id, new_id);
let entry = conn_catalog.get_item(&item.id);
tracing::info!("reassigning {} to {}", item.id, new_id);
tx.insert_item(
new_id,
item.oid,
item.schema_id,
&item.name,
item.create_sql,
item.owner_id,
item.privileges,
)?;
let object_type = entry.item_type().into();
add_to_audit_log(
tx,
mz_audit_log::EventType::Create,
object_type,
mz_audit_log::EventDetails::FromPreviousIdV1(FromPreviousIdV1 {
previous_id: item.id.to_string(),
id: new_id.to_string(),
}),
occurred_at,
)?;
add_to_audit_log(
tx,
mz_audit_log::EventType::Drop,
object_type,
mz_audit_log::EventDetails::ToNewIdV1(ToNewIdV1 {
id: item.id.to_string(),
new_id: new_id.to_string(),
}),
occurred_at,
)?;
if let Some(metadata) = deleted_metadata.remove(&item.id) {
tracing::info!(
"reassigning {}'s storage metadata to {}: {}",
item.id,
new_id,
metadata
);
updated_storage_collection_metadata.insert(new_id, metadata);
}
}
tx.insert_collection_metadata(updated_storage_collection_metadata)?;
for (old_id, new_id) in new_id_mapping.iter() {
if conn_catalog.get_item_comments(old_id).is_some() {
tracing::info!("reassigning {}'s comments to {}", old_id, new_id);
let mut comment_id = conn_catalog
.state()
.get_comment_id(mz_sql::names::ObjectId::Item(*old_id));
let curr_id = comment_id.clone();
match &mut comment_id {
CommentObjectId::Table(id)
| CommentObjectId::View(id)
| CommentObjectId::MaterializedView(id)
| CommentObjectId::Source(id)
| CommentObjectId::Sink(id)
| CommentObjectId::Index(id)
| CommentObjectId::Func(id)
| CommentObjectId::Connection(id)
| CommentObjectId::Type(id)
| CommentObjectId::Secret(id) => *id = *new_id,
id @ (CommentObjectId::Role(_)
| CommentObjectId::Database(_)
| CommentObjectId::Schema(_)
| CommentObjectId::Cluster(_)
| CommentObjectId::ClusterReplica(_)) => {
anyhow::bail!("unexpected comment ID {:?}", id)
}
};
let comments = tx.drop_comments(curr_id)?;
for (_id, subcomponent, comment) in comments {
tx.update_comment(comment_id, subcomponent, Some(comment))?;
}
}
}
struct IdUpdater<'a> {
new_id_mapping: &'a BTreeMap<GlobalId, GlobalId>,
err: Result<(), anyhow::Error>,
}
let mut id_updater = IdUpdater {
new_id_mapping: &new_id_mapping,
err: Ok(()),
};
impl<'a> VisitMut<'_, Raw> for IdUpdater<'a> {
fn visit_item_name_mut(&mut self, node: &'_ mut <Raw as mz_sql::ast::AstInfo>::ItemName) {
if let RawItemName::Id(id, _) = node {
match GlobalId::from_str(id.as_str()) {
Ok(curr_id) => {
if let Some(new_id) = self.new_id_mapping.get(&curr_id) {
*id = new_id.to_string();
}
}
Err(e) => {
if self.err.is_ok() {
self.err = Err(e);
}
}
}
}
}
}
let mut updated_items = BTreeMap::new();
for item in tx.get_items() {
let mut stmt = mz_sql_parser::parser::parse_statements(&item.create_sql)
.expect("parsing persisted create_sql must succeed")
.into_element()
.ast;
let original_redacted_sql = stmt.to_ast_string_redacted();
id_updater.visit_statement_mut(&mut stmt);
if id_updater.err.is_err() {
return id_updater.err;
}
let new_ast_string = stmt.to_ast_string_stable();
if item.create_sql != new_ast_string {
tracing::info!(
"{}'s `create_sql` string changed because of updated GlobalId\nwas: {}\n\nnow: {}",
item.id,
original_redacted_sql,
stmt.to_ast_string_redacted()
);
updated_items.insert(
item.id,
Item {
id: item.id,
oid: item.oid,
schema_id: item.schema_id,
name: item.name,
create_sql: new_ast_string,
owner_id: item.owner_id,
privileges: item.privileges,
},
);
}
}
tx.update_items(updated_items)?;
Ok(())
}
fn add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
object_type: mz_audit_log::ObjectType,
details: mz_audit_log::EventDetails,
occurred_at: EpochMillis,
) -> Result<(), anyhow::Error> {
let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
let event =
mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
tx.insert_audit_log_event(event);
Ok(())
}
fn ast_rewrite_create_source_loadgen_options_0_92_0(
stmt: &mut Statement<Raw>,
) -> Result<(), anyhow::Error> {
use mz_sql::ast::{
CreateSourceConnection, CreateSourceStatement, LoadGenerator, LoadGeneratorOptionName::*,
};
struct Rewriter;
impl<'ast> VisitMut<'ast, Raw> for Rewriter {
fn visit_create_source_statement_mut(
&mut self,
node: &'ast mut CreateSourceStatement<Raw>,
) {
match &mut node.connection {
CreateSourceConnection::LoadGenerator { generator, options } => {
let permitted_options: &[_] = match generator {
LoadGenerator::Auction => &[TickInterval],
LoadGenerator::Counter => &[TickInterval, MaxCardinality],
LoadGenerator::Marketing => &[TickInterval],
LoadGenerator::Datums => &[TickInterval],
LoadGenerator::Tpch => &[TickInterval, ScaleFactor],
LoadGenerator::KeyValue => &[
TickInterval,
Keys,
SnapshotRounds,
TransactionalSnapshot,
ValueSize,
Seed,
Partitions,
BatchSize,
],
};
options.retain(|o| permitted_options.contains(&o.name));
}
_ => {}
}
}
}
Rewriter.visit_statement_mut(stmt);
Ok(())
}
fn ast_rewrite_create_source_pg_database_details(
cat: &ConnCatalog<'_>,
stmt: &mut Statement<Raw>,
) -> Result<(), anyhow::Error> {
use mz_sql::ast::{
CreateSourceConnection, CreateSourceStatement, PgConfigOptionName, RawItemName, Value,
WithOptionValue,
};
use mz_storage_types::sources::postgres::ProtoPostgresSourcePublicationDetails;
use prost::Message;
struct Rewriter<'a> {
cat: &'a ConnCatalog<'a>,
}
impl<'ast> VisitMut<'ast, Raw> for Rewriter<'_> {
fn visit_create_source_statement_mut(
&mut self,
node: &'ast mut CreateSourceStatement<Raw>,
) {
match &mut node.connection {
CreateSourceConnection::Postgres {
connection,
options,
} => {
let details = options
.iter_mut()
.find(|o| o.name == PgConfigOptionName::Details)
.expect("PG sources must have details");
let details_val = match &mut details.value {
Some(WithOptionValue::Value(Value::String(details))) => details,
_ => unreachable!("PG source details' value must be a string"),
};
let details = hex::decode(details_val.clone())
.expect("PG source details must be a hex-encoded string");
let mut details = ProtoPostgresSourcePublicationDetails::decode(&*details)
.expect("PG source details must be a hex-encoded protobuf");
let conn = match connection {
RawItemName::Name(connection) => {
let connection =
mz_sql::normalize::unresolved_item_name(connection.clone())
.expect("PG source connection name must be valid");
self.cat
.resolve_item(&connection)
.expect("PG source connection must exist")
}
RawItemName::Id(id, _) => {
let gid = id
.parse()
.expect("RawItenName::Id must be uncorrupted GlobalId");
self.cat.state().get_entry(&gid)
}
};
let conn = conn
.connection()
.expect("PG source connection must reference a connection");
match &conn {
mz_storage_types::connections::Connection::Postgres(pg) => {
details.database = pg.database.clone();
}
_ => unreachable!("PG sources must use PG connections"),
};
*details_val = hex::encode(details.encode_to_vec());
}
_ => {}
}
}
}
Rewriter { cat }.visit_statement_mut(stmt);
Ok(())
}
pub(crate) fn durable_migrate(
tx: &mut Transaction,
boot_ts: Timestamp,
) -> Result<(), anyhow::Error> {
let boot_ts = boot_ts.into();
catalog_fix_system_cluster_replica_ids_v_0_95_0(tx, boot_ts)?;
Ok(())
}
fn catalog_fix_system_cluster_replica_ids_v_0_95_0(
tx: &mut Transaction,
boot_ts: EpochMillis,
) -> Result<(), anyhow::Error> {
use mz_audit_log::{
CreateClusterReplicaV1, DropClusterReplicaV1, EventDetails, EventType, ObjectType,
VersionedEvent,
};
use mz_catalog::durable::ReplicaLocation;
let updated_replicas: Vec<_> = tx
.get_cluster_replicas()
.filter(|replica| replica.cluster_id.is_system() && replica.replica_id.is_user())
.map(|replica| (replica.replica_id, replica))
.collect();
for (replica_id, mut updated_replica) in updated_replicas {
let sys_id = tx.allocate_system_replica_id()?;
updated_replica.replica_id = sys_id;
tx.remove_cluster_replica(replica_id)?;
tx.insert_cluster_replica(
updated_replica.cluster_id,
updated_replica.replica_id,
&updated_replica.name,
updated_replica.config.clone(),
updated_replica.owner_id,
)?;
if let ReplicaLocation::Managed {
size,
disk,
billed_as,
internal,
..
} = &updated_replica.config.location
{
let cluster = tx
.get_clusters()
.filter(|cluster| cluster.id == updated_replica.cluster_id)
.next()
.expect("missing cluster");
let drop_audit_id = tx.allocate_audit_log_id()?;
let remove_event = VersionedEvent::new(
drop_audit_id,
EventType::Drop,
ObjectType::ClusterReplica,
EventDetails::DropClusterReplicaV1(DropClusterReplicaV1 {
cluster_id: updated_replica.cluster_id.to_string(),
cluster_name: cluster.name.clone(),
replica_id: Some(replica_id.to_string()),
replica_name: updated_replica.name.clone(),
}),
None,
boot_ts,
);
let create_audit_id = tx.allocate_audit_log_id()?;
let create_event = VersionedEvent::new(
create_audit_id,
EventType::Create,
ObjectType::ClusterReplica,
EventDetails::CreateClusterReplicaV1(CreateClusterReplicaV1 {
cluster_id: updated_replica.cluster_id.to_string(),
cluster_name: cluster.name.clone(),
replica_id: Some(updated_replica.replica_id.to_string()),
replica_name: updated_replica.name,
logical_size: size.clone(),
disk: *disk,
billed_as: billed_as.clone(),
internal: *internal,
}),
None,
boot_ts,
);
tx.insert_audit_log_events([remove_event, create_event]);
}
}
Ok(())
}