mod builtin_item_migration;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::future::{BoxFuture, FutureExt};
use itertools::{Either, Itertools};
use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
use mz_catalog::builtin::{
Builtin, Fingerprint, BUILTINS, BUILTIN_CLUSTERS, BUILTIN_CLUSTER_REPLICAS, BUILTIN_PREFIXES,
BUILTIN_ROLES, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
};
use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
use mz_catalog::durable::objects::{
SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
};
use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
use mz_catalog::expr_cache::{
ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, StateUpdate,
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{ReplicaAllocation, ReplicaLogging};
use mz_controller_types::ClusterId;
use mz_ore::cast::usize_to_u64;
use mz_ore::collections::HashSet;
use mz_ore::now::to_datetime;
use mz_ore::{instrument, soft_assert_no_log};
use mz_repr::adt::mz_acl_item::PrivilegeMap;
use mz_repr::namespaces::is_unstable_schema;
use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
use mz_sql::catalog::{
BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
};
use mz_sql::func::OP_IMPLS;
use mz_sql::rbac;
use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
use mz_storage_client::storage_collections::StorageCollections;
use timely::Container;
use tracing::{info, warn, Instrument};
use uuid::Uuid;
use crate::catalog::open::builtin_item_migration::{
migrate_builtin_items, BuiltinItemMigrationResult,
};
use crate::catalog::state::LocalExpressionCache;
use crate::catalog::{
is_reserved_name, migrate, BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config,
};
use crate::AdapterError;
pub struct InitializeStateResult {
pub state: CatalogState,
pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
pub new_builtin_collections: BTreeSet<GlobalId>,
pub builtin_table_updates: Vec<BuiltinTableUpdate>,
pub last_seen_version: String,
pub expr_cache_handle: Option<ExpressionCacheHandle>,
pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
}
pub struct OpenCatalogResult {
pub catalog: Catalog,
pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
pub new_builtin_collections: BTreeSet<GlobalId>,
pub builtin_table_updates: Vec<BuiltinTableUpdate>,
pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
}
impl Catalog {
pub async fn initialize_state<'a>(
config: StateConfig,
storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
) -> Result<InitializeStateResult, AdapterError> {
for builtin_role in BUILTIN_ROLES {
assert!(
is_reserved_name(builtin_role.name),
"builtin role {builtin_role:?} must start with one of the following prefixes {}",
BUILTIN_PREFIXES.join(", ")
);
}
for builtin_cluster in BUILTIN_CLUSTERS {
assert!(
is_reserved_name(builtin_cluster.name),
"builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
BUILTIN_PREFIXES.join(", ")
);
}
let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
if config.all_features {
system_configuration.enable_all_feature_flags_by_default();
}
let mut state = CatalogState {
database_by_name: BTreeMap::new(),
database_by_id: BTreeMap::new(),
entry_by_id: BTreeMap::new(),
entry_by_global_id: BTreeMap::new(),
ambient_schemas_by_name: BTreeMap::new(),
ambient_schemas_by_id: BTreeMap::new(),
clusters_by_name: BTreeMap::new(),
clusters_by_id: BTreeMap::new(),
roles_by_name: BTreeMap::new(),
roles_by_id: BTreeMap::new(),
network_policies_by_id: BTreeMap::new(),
network_policies_by_name: BTreeMap::new(),
system_configuration,
default_privileges: DefaultPrivileges::default(),
system_privileges: PrivilegeMap::default(),
comments: CommentsMap::default(),
source_references: BTreeMap::new(),
storage_metadata: Default::default(),
temporary_schemas: BTreeMap::new(),
config: mz_sql::catalog::CatalogConfig {
start_time: to_datetime((config.now)()),
start_instant: Instant::now(),
nonce: rand::random(),
environment_id: config.environment_id,
session_id: Uuid::new_v4(),
build_info: config.build_info,
timestamp_interval: Duration::from_secs(1),
now: config.now.clone(),
connection_context: config.connection_context,
builtins_cfg: BuiltinsConfig {
include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
&config.system_parameter_defaults,
config.remote_system_parameters.as_ref(),
&ENABLE_CONTINUAL_TASK_BUILTINS,
),
},
helm_chart_version: config.helm_chart_version,
},
cluster_replica_sizes: config.cluster_replica_sizes,
availability_zones: config.availability_zones,
egress_addresses: config.egress_addresses,
aws_principal_context: config.aws_principal_context,
aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
http_host_name: config.http_host_name,
};
let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
assert!(!updates.is_empty(), "initial catalog snapshot is missing");
let mut txn = storage.transaction().await?;
let (migrated_builtins, new_builtin_collections) = {
migrate::durable_migrate(
&mut txn,
state.config.environment_id.organization_id(),
config.boot_ts,
)?;
if let Some(remote_system_parameters) = config.remote_system_parameters {
for (name, value) in remote_system_parameters {
txn.upsert_system_config(&name, value)?;
}
txn.set_system_config_synced_once()?;
}
let (migrated_builtins, new_builtin_collections) =
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
system_cluster: config.builtin_system_cluster_config,
catalog_server_cluster: config.builtin_catalog_server_cluster_config,
probe_cluster: config.builtin_probe_cluster_config,
support_cluster: config.builtin_support_cluster_config,
analytics_cluster: config.builtin_analytics_cluster_config,
};
add_new_remove_old_builtin_clusters_migration(
&mut txn,
&builtin_bootstrap_cluster_config_map,
&state.cluster_replica_sizes,
)?;
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
add_new_remove_old_builtin_cluster_replicas_migration(
&mut txn,
&builtin_bootstrap_cluster_config_map,
&state.cluster_replica_sizes,
)?;
add_new_remove_old_builtin_roles_migration(&mut txn)?;
remove_invalid_config_param_role_defaults_migration(&mut txn)?;
(migrated_builtins, new_builtin_collections)
};
remove_pending_cluster_replicas_migration(&mut txn)?;
let op_updates = txn.get_and_commit_op_updates();
updates.extend(op_updates);
{
for (name, value) in config.system_parameter_defaults {
match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
Ok(_) => (),
Err(Error {
kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
}) => {
warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
}
Err(e) => return Err(e.into()),
};
}
state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
}
let mut builtin_table_updates = Vec::new();
let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
differential_dataflow::consolidation::consolidate_updates(&mut updates);
soft_assert_no_log!(
updates.iter().all(|(_, _, diff)| *diff == 1),
"consolidated updates should be positive during startup: {updates:?}"
);
let mut pre_item_updates = Vec::new();
let mut system_item_updates = Vec::new();
let mut item_updates = Vec::new();
let mut post_item_updates = Vec::new();
let mut audit_log_updates = Vec::new();
for (kind, ts, diff) in updates {
match kind {
BootstrapStateUpdateKind::Role(_)
| BootstrapStateUpdateKind::Database(_)
| BootstrapStateUpdateKind::Schema(_)
| BootstrapStateUpdateKind::DefaultPrivilege(_)
| BootstrapStateUpdateKind::SystemPrivilege(_)
| BootstrapStateUpdateKind::SystemConfiguration(_)
| BootstrapStateUpdateKind::Cluster(_)
| BootstrapStateUpdateKind::NetworkPolicy(_)
| BootstrapStateUpdateKind::ClusterReplica(_) => {
pre_item_updates.push(StateUpdate {
kind: kind.into(),
ts,
diff: diff.try_into().expect("valid diff"),
})
}
BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
| BootstrapStateUpdateKind::SystemObjectMapping(_) => {
system_item_updates.push(StateUpdate {
kind: kind.into(),
ts,
diff: diff.try_into().expect("valid diff"),
})
}
BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
kind: kind.into(),
ts,
diff: diff.try_into().expect("valid diff"),
}),
BootstrapStateUpdateKind::Comment(_)
| BootstrapStateUpdateKind::StorageCollectionMetadata(_)
| BootstrapStateUpdateKind::SourceReferences(_)
| BootstrapStateUpdateKind::UnfinalizedShard(_) => {
post_item_updates.push((kind, ts, diff));
}
BootstrapStateUpdateKind::AuditLog(_) => {
audit_log_updates.push(StateUpdate {
kind: kind.into(),
ts,
diff: diff.try_into().expect("valid diff"),
});
}
}
}
let builtin_table_update = state
.apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
.await;
builtin_table_updates.extend(builtin_table_update);
let expr_cache_start = Instant::now();
info!("startup: coordinator init: catalog open: expr cache open beginning");
let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
let expr_cache_enabled = config.enable_0dt_deployment
&& config
.enable_expression_cache_override
.unwrap_or(enable_expr_cache_dyncfg);
let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
info!(
?config.enable_0dt_deployment,
?config.enable_expression_cache_override,
?enable_expr_cache_dyncfg,
"using expression cache for startup"
);
let current_ids = txn
.get_items()
.flat_map(|item| {
let gid = item.global_id.clone();
let gids: Vec<_> = item.extra_versions.values().cloned().collect();
std::iter::once(gid).chain(gids.into_iter())
})
.chain(
txn.get_system_object_mappings()
.map(|som| som.unique_identifier.global_id),
)
.collect();
let dyncfgs = config.persist_client.dyncfgs().clone();
let build_version = if config.build_info.is_dev() {
config
.build_info
.semver_version_build()
.expect("build ID is not available on your platform!")
} else {
config.build_info.semver_version()
};
let expr_cache_config = ExpressionCacheConfig {
build_version,
shard_id: txn
.get_expression_cache_shard()
.expect("expression cache shard should exist for opened catalogs"),
persist: config.persist_client,
current_ids,
remove_prior_versions: !config.read_only,
compact_shard: config.read_only,
dyncfgs,
};
let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
(
Some(expr_cache_handle),
cached_local_exprs,
cached_global_exprs,
)
} else {
(None, BTreeMap::new(), BTreeMap::new())
};
let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
info!(
"startup: coordinator init: catalog open: expr cache open complete in {:?}",
expr_cache_start.elapsed()
);
let builtin_table_update = state
.apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
.await;
builtin_table_updates.extend(builtin_table_update);
let last_seen_version = txn
.get_catalog_content_version()
.unwrap_or_else(|| "new".to_string());
let builtin_table_update = if !config.skip_migrations {
let migrate_result = migrate::migrate(
&mut state,
&mut txn,
&mut local_expr_cache,
item_updates,
config.now,
config.boot_ts,
)
.await
.map_err(|e| {
Error::new(ErrorKind::FailedMigration {
last_seen_version: last_seen_version.clone(),
this_version: config.build_info.version,
cause: e.to_string(),
})
})?;
if !migrate_result.post_item_updates.is_empty() {
post_item_updates.extend(migrate_result.post_item_updates);
if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
for (_, ts, _) in &mut post_item_updates {
*ts = max_ts;
}
}
differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
}
migrate_result.builtin_table_updates
} else {
state
.apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
.await
};
builtin_table_updates.extend(builtin_table_update);
let post_item_updates = post_item_updates
.into_iter()
.map(|(kind, ts, diff)| StateUpdate {
kind: kind.into(),
ts,
diff: diff.try_into().expect("valid diff"),
})
.collect();
let builtin_table_update = state
.apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
.await;
builtin_table_updates.extend(builtin_table_update);
for audit_log_update in audit_log_updates {
builtin_table_updates.extend(
state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
);
}
let BuiltinItemMigrationResult {
builtin_table_updates: builtin_table_update,
migrated_storage_collections_0dt,
cleanup_action,
} = migrate_builtin_items(
&mut state,
&mut txn,
&mut local_expr_cache,
migrated_builtins,
config.builtin_item_migration_config,
)
.await?;
builtin_table_updates.extend(builtin_table_update);
let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
txn.commit(config.boot_ts).await?;
cleanup_action.await;
Ok(InitializeStateResult {
state,
migrated_storage_collections_0dt,
new_builtin_collections: new_builtin_collections.into_iter().collect(),
builtin_table_updates,
last_seen_version,
expr_cache_handle,
cached_global_exprs,
uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
})
}
#[instrument(name = "catalog::open")]
pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
async move {
let mut storage = config.storage;
let InitializeStateResult {
state,
migrated_storage_collections_0dt,
new_builtin_collections,
mut builtin_table_updates,
last_seen_version: _,
expr_cache_handle,
cached_global_exprs,
uncached_local_exprs,
} =
Self::initialize_state(config.state, &mut storage)
.instrument(tracing::info_span!("catalog::initialize_state"))
.boxed()
.await?;
let catalog = Catalog {
state,
plans: CatalogPlans::default(),
expr_cache_handle,
transient_revision: 1,
storage: Arc::new(tokio::sync::Mutex::new(storage)),
};
for (op, func) in OP_IMPLS.iter() {
match func {
mz_sql::func::Func::Scalar(impls) => {
for imp in impls {
builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
catalog.state.pack_op_update(op, imp.details(), 1),
));
}
}
_ => unreachable!("all operators must be scalar functions"),
}
}
for ip in &catalog.state.egress_addresses {
builtin_table_updates.push(
catalog
.state
.resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
);
}
catalog.storage().await.mark_bootstrap_complete();
Ok(OpenCatalogResult {
catalog,
migrated_storage_collections_0dt,
new_builtin_collections,
builtin_table_updates,
cached_global_exprs,
uncached_local_exprs,
})
}
.instrument(tracing::info_span!("catalog::open"))
.boxed()
}
async fn initialize_storage_state(
&mut self,
storage_collections: &Arc<
dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
>,
) -> Result<(), mz_catalog::durable::CatalogError> {
let collections = self
.entries()
.filter(|entry| entry.item().is_storage_collection())
.flat_map(|entry| entry.global_ids())
.collect();
let mut state = self.state.clone();
let mut storage = self.storage().await;
let mut txn = storage.transaction().await?;
storage_collections
.initialize_state(&mut txn, collections)
.await
.map_err(mz_catalog::durable::DurableCatalogError::from)?;
let updates = txn.get_and_commit_op_updates();
let builtin_updates = state.apply_updates(updates)?;
assert!(builtin_updates.is_empty());
let commit_ts = txn.upper();
txn.commit(commit_ts).await?;
drop(storage);
self.state = state;
Ok(())
}
pub async fn initialize_controller(
&mut self,
config: mz_controller::ControllerConfig,
envd_epoch: core::num::NonZeroI64,
read_only: bool,
) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
{
let controller_start = Instant::now();
info!("startup: controller init: beginning");
let controller = {
let mut storage = self.storage().await;
let mut tx = storage.transaction().await?;
mz_controller::prepare_initialization(&mut tx)
.map_err(mz_catalog::durable::DurableCatalogError::from)?;
let updates = tx.get_and_commit_op_updates();
assert!(
updates.is_empty(),
"initializing controller should not produce updates: {updates:?}"
);
let commit_ts = tx.upper();
tx.commit(commit_ts).await?;
let read_only_tx = storage.transaction().await?;
mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
};
self.initialize_storage_state(&controller.storage_collections)
.await?;
info!(
"startup: controller init: complete in {:?}",
controller_start.elapsed()
);
Ok(controller)
}
pub async fn expire(self) {
if let Some(storage) = Arc::into_inner(self.storage) {
let storage = storage.into_inner();
storage.expire().await;
}
}
}
impl CatalogState {
fn set_system_configuration_default(
&mut self,
name: &str,
value: VarInput,
) -> Result<(), Error> {
Ok(self.system_configuration.set_default(name, value)?)
}
}
fn add_new_remove_old_builtin_items_migration(
builtins_cfg: &BuiltinsConfig,
txn: &mut mz_catalog::durable::Transaction<'_>,
) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
let mut new_builtin_mappings = Vec::new();
let mut migrated_builtin_ids = Vec::new();
let mut builtin_descs = HashSet::new();
let mut builtins = Vec::new();
for builtin in BUILTINS::iter(builtins_cfg) {
let desc = SystemObjectDescription {
schema_name: builtin.schema().to_string(),
object_type: builtin.catalog_item_type(),
object_name: builtin.name().to_string(),
};
if !builtin_descs.insert(desc.clone()) {
panic!(
"duplicate builtin description: {:?}, {:?}",
SystemObjectDescription {
schema_name: builtin.schema().to_string(),
object_type: builtin.catalog_item_type(),
object_name: builtin.name().to_string(),
},
builtin
);
}
builtins.push((desc, builtin));
}
let mut system_object_mappings: BTreeMap<_, _> = txn
.get_system_object_mappings()
.map(|system_object_mapping| {
(
system_object_mapping.description.clone(),
system_object_mapping,
)
})
.collect();
let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
builtins.into_iter().partition_map(|(desc, builtin)| {
let fingerprint = match builtin.runtime_alterable() {
false => builtin.fingerprint(),
true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
};
match system_object_mappings.remove(&desc) {
Some(system_object_mapping) => {
Either::Left((builtin, system_object_mapping, fingerprint))
}
None => Either::Right((builtin, fingerprint)),
}
});
let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
let new_builtins = new_builtins.into_iter().zip(new_builtin_ids.clone());
for (builtin, system_object_mapping, fingerprint) in existing_builtins {
if system_object_mapping.unique_identifier.fingerprint != fingerprint {
assert_ne!(
*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
"mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
);
assert_ne!(
builtin.catalog_item_type(),
CatalogItemType::Type,
"types cannot be migrated"
);
assert_ne!(
system_object_mapping.unique_identifier.fingerprint,
RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
"clearing the runtime alterable flag on an existing object is not permitted",
);
assert!(
!builtin.runtime_alterable(),
"setting the runtime alterable flag on an existing object is not permitted"
);
migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
}
}
for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins {
new_builtin_mappings.push(SystemObjectMapping {
description: SystemObjectDescription {
schema_name: builtin.schema().to_string(),
object_type: builtin.catalog_item_type(),
object_name: builtin.name().to_string(),
},
unique_identifier: SystemObjectUniqueIdentifier {
catalog_id,
global_id,
fingerprint,
},
});
let handled_runtime_alterable = match builtin {
Builtin::Connection(c) if c.runtime_alterable => {
let mut acl_items = vec![rbac::owner_privilege(
mz_sql::catalog::ObjectType::Connection,
c.owner_id.clone(),
)];
acl_items.extend_from_slice(c.access);
let versions = BTreeMap::new();
txn.insert_item(
catalog_id,
c.oid,
global_id,
mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
c.name,
c.sql.into(),
*c.owner_id,
acl_items,
versions,
)?;
true
}
_ => false,
};
assert_eq!(
builtin.runtime_alterable(),
handled_runtime_alterable,
"runtime alterable object was not handled by migration",
);
}
txn.set_system_object_mappings(new_builtin_mappings)?;
let mut deleted_system_objects = BTreeSet::new();
let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
for (_, mapping) in system_object_mappings {
deleted_system_objects.insert(mapping.description);
if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
}
}
let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
assert!(
deleted_system_objects
.iter()
.filter(|object| object.object_type != CatalogItemType::Index)
.all(
|deleted_object| is_unstable_schema(&deleted_object.schema_name)
|| delete_exceptions.contains(deleted_object)
),
"only objects in unstable schemas can be deleted, deleted objects: {:?}",
deleted_system_objects
);
txn.remove_items(&deleted_runtime_alterable_system_ids)?;
txn.remove_system_object_mappings(deleted_system_objects)?;
let new_builtin_collections = new_builtin_ids
.into_iter()
.map(|(_catalog_id, global_id)| global_id)
.collect();
Ok((migrated_builtin_ids, new_builtin_collections))
}
fn add_new_remove_old_builtin_clusters_migration(
txn: &mut mz_catalog::durable::Transaction<'_>,
builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
cluster_sizes: &ClusterReplicaSizeMap,
) -> Result<(), mz_catalog::durable::CatalogError> {
let mut durable_clusters: BTreeMap<_, _> = txn
.get_clusters()
.filter(|cluster| cluster.id.is_system())
.map(|cluster| (cluster.name.to_string(), cluster))
.collect();
for builtin_cluster in BUILTIN_CLUSTERS {
if durable_clusters.remove(builtin_cluster.name).is_none() {
let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_config.size)?;
txn.insert_system_cluster(
builtin_cluster.name,
vec![],
builtin_cluster.privileges.to_vec(),
builtin_cluster.owner_id.to_owned(),
mz_catalog::durable::ClusterConfig {
variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
size: cluster_config.size,
availability_zones: vec![],
replication_factor: cluster_config.replication_factor,
disk: cluster_allocation.is_cc,
logging: default_logging_config(),
optimizer_feature_overrides: Default::default(),
schedule: Default::default(),
}),
workload_class: None,
},
&HashSet::new(),
)?;
}
}
let old_clusters = durable_clusters
.values()
.map(|cluster| cluster.id)
.collect();
txn.remove_clusters(&old_clusters)?;
Ok(())
}
fn add_new_remove_old_builtin_introspection_source_migration(
txn: &mut mz_catalog::durable::Transaction<'_>,
) -> Result<(), AdapterError> {
let mut new_indexes = Vec::new();
let mut removed_indexes = BTreeSet::new();
for cluster in txn.get_clusters() {
let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
let mut new_logs = Vec::new();
for log in BUILTINS::logs() {
if introspection_source_index_ids.remove(log.name).is_none() {
new_logs.push(log);
}
}
for log in new_logs {
let (item_id, gid) =
Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
}
removed_indexes.extend(
introspection_source_index_ids
.into_keys()
.map(|name| (cluster.id, name)),
);
}
txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
txn.remove_introspection_source_indexes(removed_indexes)?;
Ok(())
}
fn add_new_remove_old_builtin_roles_migration(
txn: &mut mz_catalog::durable::Transaction<'_>,
) -> Result<(), mz_catalog::durable::CatalogError> {
let mut durable_roles: BTreeMap<_, _> = txn
.get_roles()
.filter(|role| role.id.is_system() || role.id.is_predefined())
.map(|role| (role.name.to_string(), role))
.collect();
for builtin_role in BUILTIN_ROLES {
if durable_roles.remove(builtin_role.name).is_none() {
txn.insert_builtin_role(
builtin_role.id,
builtin_role.name.to_string(),
builtin_role.attributes.clone(),
RoleMembership::new(),
RoleVars::default(),
builtin_role.oid,
)?;
}
}
let old_roles = durable_roles.values().map(|role| role.id).collect();
txn.remove_roles(&old_roles)?;
Ok(())
}
fn add_new_remove_old_builtin_cluster_replicas_migration(
txn: &mut Transaction<'_>,
builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
cluster_sizes: &ClusterReplicaSizeMap,
) -> Result<(), AdapterError> {
let cluster_lookup: BTreeMap<_, _> = txn
.get_clusters()
.map(|cluster| (cluster.name.clone(), cluster.clone()))
.collect();
let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
.get_cluster_replicas()
.filter(|replica| replica.replica_id.is_system())
.fold(BTreeMap::new(), |mut acc, replica| {
acc.entry(replica.cluster_id)
.or_insert_with(BTreeMap::new)
.insert(replica.name.to_string(), replica);
acc
});
for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
let cluster = cluster_lookup
.get(builtin_replica.cluster_name)
.expect("builtin cluster replica references non-existent cluster");
let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
let replica_names = durable_replicas
.get_mut(&cluster.id)
.unwrap_or(&mut empty_map);
let builtin_cluster_boostrap_config =
builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
if replica_names.remove(builtin_replica.name).is_none()
&& builtin_cluster_boostrap_config.replication_factor > 0
{
let replica_size = match cluster.config.variant {
ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
};
let replica_allocation = cluster_sizes.get_allocation_by_name(&replica_size)?;
let config = builtin_cluster_replica_config(replica_size, replica_allocation);
txn.insert_cluster_replica(
cluster.id,
builtin_replica.name,
config,
MZ_SYSTEM_ROLE_ID,
)?;
}
}
let old_replicas = durable_replicas
.values()
.flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
.collect();
txn.remove_cluster_replicas(&old_replicas)?;
Ok(())
}
fn remove_invalid_config_param_role_defaults_migration(
txn: &mut Transaction<'_>,
) -> Result<(), AdapterError> {
static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
let roles_to_migrate: BTreeMap<_, _> = txn
.get_roles()
.filter_map(|mut role| {
let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
let mut invalid_roles_vars = BTreeMap::new();
for (name, value) in &role.vars.map {
let Ok(session_var) = session_vars.inspect(name) else {
invalid_roles_vars.insert(name.clone(), value.clone());
continue;
};
if session_var.check(value.borrow()).is_err() {
invalid_roles_vars.insert(name.clone(), value.clone());
}
}
if invalid_roles_vars.is_empty() {
return None;
}
tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
for (name, _value) in invalid_roles_vars {
role.vars.map.remove(&name);
}
Some(role)
})
.map(|role| (role.id, role))
.collect();
txn.update_roles(roles_to_migrate)?;
Ok(())
}
fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
for replica in tx.get_cluster_replicas() {
if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
replica.config.location
{
tx.remove_cluster_replica(replica.replica_id)?;
}
}
Ok(())
}
pub(crate) fn builtin_cluster_replica_config(
replica_size: String,
replica_allocation: &ReplicaAllocation,
) -> mz_catalog::durable::ReplicaConfig {
mz_catalog::durable::ReplicaConfig {
location: mz_catalog::durable::ReplicaLocation::Managed {
availability_zone: None,
billed_as: None,
disk: replica_allocation.is_cc,
pending: false,
internal: false,
size: replica_size,
},
logging: default_logging_config(),
}
}
fn default_logging_config() -> ReplicaLogging {
ReplicaLogging {
log_logging: false,
interval: Some(Duration::from_secs(1)),
}
}
#[derive(Debug)]
pub struct BuiltinBootstrapClusterConfigMap {
pub system_cluster: BootstrapBuiltinClusterConfig,
pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
pub probe_cluster: BootstrapBuiltinClusterConfig,
pub support_cluster: BootstrapBuiltinClusterConfig,
pub analytics_cluster: BootstrapBuiltinClusterConfig,
}
impl BuiltinBootstrapClusterConfigMap {
fn get_config(
&self,
cluster_name: &str,
) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
&self.system_cluster
} else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
&self.catalog_server_cluster
} else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
&self.probe_cluster
} else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
&self.support_cluster
} else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
&self.analytics_cluster
} else {
return Err(mz_catalog::durable::CatalogError::Catalog(
SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
));
};
Ok(cluster_config.clone())
}
}
pub(crate) fn into_consolidatable_updates_startup(
updates: Vec<StateUpdate>,
ts: Timestamp,
) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
updates
.into_iter()
.map(|StateUpdate { kind, ts: _, diff }| {
let kind: BootstrapStateUpdateKind = kind
.try_into()
.unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
(kind, ts, Diff::from(diff))
})
.collect()
}
fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
defaults: &BTreeMap<String, String>,
remote: Option<&BTreeMap<String, String>>,
cfg: &mz_dyncfg::Config<T>,
) -> T::ConfigType {
let mut val = T::into_config_type(cfg.default().clone());
let get_fn = |map: &BTreeMap<String, String>| {
let val = map.get(cfg.name())?;
match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
Ok(x) => Some(x),
Err(err) => {
tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
None
}
}
};
if let Some(x) = get_fn(defaults) {
val = x;
}
if let Some(x) = remote.and_then(get_fn) {
val = x;
}
val
}