1mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::ENABLE_EXPRESSION_CACHE;
23use mz_audit_log::{
24 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, ObjectType, VersionedEvent,
25};
26use mz_auth::hash::scram256_hash;
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::{
29 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
30 Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
31};
32use mz_catalog::config::StateConfig;
33use mz_catalog::durable::objects::{
34 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
35};
36use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
37use mz_catalog::expr_cache::{
38 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
39};
40use mz_catalog::memory::error::{Error, ErrorKind};
41use mz_catalog::memory::objects::{
42 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
43};
44use mz_controller::clusters::ReplicaLogging;
45use mz_controller_types::ClusterId;
46use mz_ore::cast::usize_to_u64;
47use mz_ore::collections::HashSet;
48use mz_ore::now::{SYSTEM_TIME, to_datetime};
49use mz_ore::{instrument, soft_assert_no_log};
50use mz_repr::adt::mz_acl_item::PrivilegeMap;
51use mz_repr::namespaces::is_unstable_schema;
52use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
53use mz_sql::catalog::{CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars};
54use mz_sql::func::OP_IMPLS;
55use mz_sql::names::CommentObjectId;
56use mz_sql::rbac;
57use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
58use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
59use mz_storage_client::controller::{StorageMetadata, StorageTxn};
60use mz_storage_client::storage_collections::StorageCollections;
61use tracing::{Instrument, info, warn};
62use uuid::Uuid;
63
64use crate::AdapterError;
66use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
67use crate::catalog::state::LocalExpressionCache;
68use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState, Config, is_reserved_name};
69
70pub struct InitializeStateResult {
71 pub state: CatalogState,
73 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
75 pub new_builtin_collections: BTreeSet<GlobalId>,
77 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
79 pub last_seen_version: String,
81 pub expr_cache_handle: Option<ExpressionCacheHandle>,
83 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
85 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
87}
88
89pub struct OpenCatalogResult {
90 pub catalog: Catalog,
92 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
94 pub new_builtin_collections: BTreeSet<GlobalId>,
96 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
98 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
100 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
102}
103
104impl Catalog {
105 pub async fn initialize_state<'a>(
109 config: StateConfig,
110 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
111 ) -> Result<InitializeStateResult, AdapterError> {
112 for builtin_role in BUILTIN_ROLES {
113 assert!(
114 is_reserved_name(builtin_role.name),
115 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
116 BUILTIN_PREFIXES.join(", ")
117 );
118 }
119 for builtin_cluster in BUILTIN_CLUSTERS {
120 assert!(
121 is_reserved_name(builtin_cluster.name),
122 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
123 BUILTIN_PREFIXES.join(", ")
124 );
125 }
126
127 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
128 if config.all_features {
129 system_configuration.enable_all_feature_flags_by_default();
130 }
131
132 let mut state = CatalogState {
133 database_by_name: imbl::OrdMap::new(),
134 database_by_id: imbl::OrdMap::new(),
135 entry_by_id: imbl::OrdMap::new(),
136 entry_by_global_id: imbl::OrdMap::new(),
137 notices_by_dep_id: imbl::OrdMap::new(),
138 ambient_schemas_by_name: imbl::OrdMap::new(),
139 ambient_schemas_by_id: imbl::OrdMap::new(),
140 clusters_by_name: imbl::OrdMap::new(),
141 clusters_by_id: imbl::OrdMap::new(),
142 roles_by_name: imbl::OrdMap::new(),
143 roles_by_id: imbl::OrdMap::new(),
144 network_policies_by_id: imbl::OrdMap::new(),
145 role_auth_by_id: imbl::OrdMap::new(),
146 network_policies_by_name: imbl::OrdMap::new(),
147 system_configuration: Arc::new(system_configuration),
148 default_privileges: Arc::new(DefaultPrivileges::default()),
149 system_privileges: Arc::new(PrivilegeMap::default()),
150 comments: Arc::new(CommentsMap::default()),
151 source_references: imbl::OrdMap::new(),
152 storage_metadata: Arc::new(StorageMetadata::default()),
153 temporary_schemas: imbl::OrdMap::new(),
154 mock_authentication_nonce: Default::default(),
155 config: mz_sql::catalog::CatalogConfig {
156 start_time: to_datetime((config.now)()),
157 start_instant: Instant::now(),
158 nonce: rand::random(),
159 environment_id: config.environment_id,
160 session_id: Uuid::new_v4(),
161 build_info: config.build_info,
162 now: config.now.clone(),
163 connection_context: config.connection_context,
164 helm_chart_version: config.helm_chart_version,
165 },
166 cluster_replica_sizes: config.cluster_replica_sizes,
167 availability_zones: config.availability_zones,
168 egress_addresses: config.egress_addresses,
169 aws_principal_context: config.aws_principal_context,
170 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
171 http_host_name: config.http_host_name,
172 license_key: config.license_key,
173 };
174
175 let deploy_generation = storage.get_deployment_generation().await?;
176
177 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
178 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
179 let mut txn = storage.transaction().await?;
180
181 let new_builtin_collections = {
183 migrate::durable_migrate(
184 &mut txn,
185 state.config.environment_id.organization_id(),
186 config.boot_ts,
187 )?;
188 if let Some(remote_system_parameters) = config.remote_system_parameters {
191 for (name, value) in remote_system_parameters {
192 txn.upsert_system_config(&name, value)?;
193 }
194 txn.set_system_config_synced_once()?;
195 }
196 let new_builtin_collections = add_new_remove_old_builtin_items_migration(&mut txn)?;
198 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
199 system_cluster: config.builtin_system_cluster_config,
200 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
201 probe_cluster: config.builtin_probe_cluster_config,
202 support_cluster: config.builtin_support_cluster_config,
203 analytics_cluster: config.builtin_analytics_cluster_config,
204 };
205 add_new_remove_old_builtin_clusters_migration(
206 &mut txn,
207 &builtin_bootstrap_cluster_config_map,
208 config.boot_ts,
209 )?;
210 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
211 add_new_remove_old_builtin_cluster_replicas_migration(
212 &mut txn,
213 &builtin_bootstrap_cluster_config_map,
214 config.boot_ts,
215 )?;
216 add_new_remove_old_builtin_roles_migration(&mut txn)?;
217 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
218 remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
219
220 new_builtin_collections
221 };
222
223 let op_updates = txn.get_and_commit_op_updates();
224 updates.extend(op_updates);
225
226 let mut builtin_table_updates = Vec::new();
227
228 {
230 for (name, value) in config.system_parameter_defaults {
233 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
234 Ok(_) => (),
235 Err(Error {
236 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
237 }) => {
238 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
239 }
240 Err(e) => return Err(e.into()),
241 };
242 }
243 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
244 }
245
246 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
249 differential_dataflow::consolidation::consolidate_updates(&mut updates);
250 soft_assert_no_log!(
251 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
252 "consolidated updates should be positive during startup: {updates:?}"
253 );
254
255 let mut pre_item_updates = Vec::new();
256 let mut system_item_updates = Vec::new();
257 let mut item_updates = Vec::new();
258 let mut post_item_updates = Vec::new();
259 let mut audit_log_updates = Vec::new();
260 for (kind, ts, diff) in updates {
261 match kind {
262 BootstrapStateUpdateKind::Role(_)
263 | BootstrapStateUpdateKind::RoleAuth(_)
264 | BootstrapStateUpdateKind::Database(_)
265 | BootstrapStateUpdateKind::Schema(_)
266 | BootstrapStateUpdateKind::DefaultPrivilege(_)
267 | BootstrapStateUpdateKind::SystemPrivilege(_)
268 | BootstrapStateUpdateKind::SystemConfiguration(_)
269 | BootstrapStateUpdateKind::Cluster(_)
270 | BootstrapStateUpdateKind::NetworkPolicy(_)
271 | BootstrapStateUpdateKind::ClusterReplica(_) => {
272 pre_item_updates.push(StateUpdate {
273 kind: kind.into(),
274 ts,
275 diff: diff.try_into().expect("valid diff"),
276 })
277 }
278 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
279 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
280 system_item_updates.push(StateUpdate {
281 kind: kind.into(),
282 ts,
283 diff: diff.try_into().expect("valid diff"),
284 })
285 }
286 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
287 kind: kind.into(),
288 ts,
289 diff: diff.try_into().expect("valid diff"),
290 }),
291 BootstrapStateUpdateKind::Comment(_)
292 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
293 | BootstrapStateUpdateKind::SourceReferences(_)
294 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
295 post_item_updates.push((kind, ts, diff));
296 }
297 BootstrapStateUpdateKind::AuditLog(_) => {
298 audit_log_updates.push(StateUpdate {
299 kind: kind.into(),
300 ts,
301 diff: diff.try_into().expect("valid diff"),
302 });
303 }
304 }
305 }
306
307 let (builtin_table_update, _catalog_updates) = state
308 .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
309 .await;
310 builtin_table_updates.extend(builtin_table_update);
311
312 {
316 if let Some(password) = config.external_login_password_mz_system {
317 let role_auth = RoleAuth {
318 role_id: MZ_SYSTEM_ROLE_ID,
319 password_hash: Some(
322 scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
323 .map_err(|_| {
324 AdapterError::Internal("Failed to hash mz_system password.".to_owned())
325 })?,
326 ),
327 updated_at: SYSTEM_TIME(),
328 };
329 state
330 .role_auth_by_id
331 .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
332 let builtin_table_update = state.generate_builtin_table_update(
333 mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
334 mz_catalog::memory::objects::StateDiff::Addition,
335 );
336 builtin_table_updates.extend(builtin_table_update);
337 }
338 }
339
340 let expr_cache_start = Instant::now();
341 info!("startup: coordinator init: catalog open: expr cache open beginning");
342 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
345 let expr_cache_enabled = config
346 .enable_expression_cache_override
347 .unwrap_or(enable_expr_cache_dyncfg);
348 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
349 info!(
350 ?config.enable_expression_cache_override,
351 ?enable_expr_cache_dyncfg,
352 "using expression cache for startup"
353 );
354 let current_ids = txn
355 .get_items()
356 .flat_map(|item| {
357 let gid = item.global_id.clone();
358 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
359 std::iter::once(gid).chain(gids)
360 })
361 .chain(
362 txn.get_system_object_mappings()
363 .map(|som| som.unique_identifier.global_id),
364 )
365 .collect();
366 let dyncfgs = config.persist_client.dyncfgs().clone();
367 let build_version = if config.build_info.is_dev() {
368 config
371 .build_info
372 .semver_version_build()
373 .expect("build ID is not available on your platform!")
374 } else {
375 config.build_info.semver_version()
376 };
377 let expr_cache_config = ExpressionCacheConfig {
378 build_version,
379 shard_id: txn
380 .get_expression_cache_shard()
381 .expect("expression cache shard should exist for opened catalogs"),
382 persist: config.persist_client,
383 current_ids,
384 remove_prior_versions: !config.read_only,
385 compact_shard: config.read_only,
386 dyncfgs,
387 };
388 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
389 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
390 (
391 Some(expr_cache_handle),
392 cached_local_exprs,
393 cached_global_exprs,
394 )
395 } else {
396 (None, BTreeMap::new(), BTreeMap::new())
397 };
398 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
399 info!(
400 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
401 expr_cache_start.elapsed()
402 );
403
404 let (builtin_table_update, _catalog_updates) = state
410 .apply_updates(system_item_updates, &mut local_expr_cache)
411 .await;
412 builtin_table_updates.extend(builtin_table_update);
413
414 let last_seen_version =
415 get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
416
417 let mz_authentication_mock_nonce =
418 txn.get_authentication_mock_nonce().ok_or_else(|| {
419 Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
420 })?;
421
422 state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
423
424 let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
426 let migrate_result = migrate::migrate(
427 &mut state,
428 &mut txn,
429 &mut local_expr_cache,
430 item_updates,
431 config.now,
432 config.boot_ts,
433 )
434 .await
435 .map_err(|e| {
436 Error::new(ErrorKind::FailedCatalogMigration {
437 last_seen_version: last_seen_version.clone(),
438 this_version: config.build_info.version,
439 cause: e.to_string(),
440 })
441 })?;
442 if !migrate_result.post_item_updates.is_empty() {
443 post_item_updates.extend(migrate_result.post_item_updates);
446 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
448 for (_, ts, _) in &mut post_item_updates {
449 *ts = max_ts;
450 }
451 }
452 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
453 }
454
455 (
456 migrate_result.builtin_table_updates,
457 migrate_result.catalog_updates,
458 )
459 } else {
460 state
461 .apply_updates(item_updates, &mut local_expr_cache)
462 .await
463 };
464 builtin_table_updates.extend(builtin_table_update);
465
466 let post_item_updates = post_item_updates
467 .into_iter()
468 .map(|(kind, ts, diff)| StateUpdate {
469 kind: kind.into(),
470 ts,
471 diff: diff.try_into().expect("valid diff"),
472 })
473 .collect();
474 let (builtin_table_update, _catalog_updates) = state
475 .apply_updates(post_item_updates, &mut local_expr_cache)
476 .await;
477 builtin_table_updates.extend(builtin_table_update);
478
479 for audit_log_update in audit_log_updates {
483 builtin_table_updates.extend(
484 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
485 );
486 }
487
488 let schema_migration_result = builtin_schema_migration::run(
490 config.build_info,
491 deploy_generation,
492 &mut txn,
493 config.builtin_item_migration_config,
494 )
495 .await?;
496
497 let state_updates = txn.get_and_commit_op_updates();
498
499 let (table_updates, _catalog_updates) = state
505 .apply_updates(state_updates, &mut local_expr_cache)
506 .await;
507 builtin_table_updates.extend(table_updates);
508 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
509
510 set_migration_version(&mut txn, config.build_info.semver_version())?;
512
513 txn.commit(config.boot_ts).await?;
514
515 schema_migration_result.cleanup_action.await;
517
518 Ok(InitializeStateResult {
519 state,
520 migrated_storage_collections_0dt: schema_migration_result.replaced_items,
521 new_builtin_collections: new_builtin_collections.into_iter().collect(),
522 builtin_table_updates,
523 last_seen_version,
524 expr_cache_handle,
525 cached_global_exprs,
526 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
527 })
528 }
529
530 #[instrument(name = "catalog::open")]
541 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
542 async move {
543 let mut storage = config.storage;
544
545 let InitializeStateResult {
546 state,
547 migrated_storage_collections_0dt,
548 new_builtin_collections,
549 mut builtin_table_updates,
550 last_seen_version: _,
551 expr_cache_handle,
552 cached_global_exprs,
553 uncached_local_exprs,
554 } =
555 Self::initialize_state(config.state, &mut storage)
559 .instrument(tracing::info_span!("catalog::initialize_state"))
560 .boxed()
561 .await?;
562
563 let catalog = Catalog {
564 state,
565 expr_cache_handle,
566 transient_revision: 1,
567 storage: Arc::new(tokio::sync::Mutex::new(storage)),
568 };
569
570 for (op, func) in OP_IMPLS.iter() {
573 match func {
574 mz_sql::func::Func::Scalar(impls) => {
575 for imp in impls {
576 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
577 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
578 ));
579 }
580 }
581 _ => unreachable!("all operators must be scalar functions"),
582 }
583 }
584
585 for ip in &catalog.state.egress_addresses {
586 builtin_table_updates.push(
587 catalog
588 .state
589 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
590 );
591 }
592
593 if !catalog.state.license_key.id.is_empty() {
594 builtin_table_updates.push(
595 catalog.state.resolve_builtin_table_update(
596 catalog
597 .state
598 .pack_license_key_update(&catalog.state.license_key)?,
599 ),
600 );
601 }
602
603 catalog.storage().await.mark_bootstrap_complete().await;
604
605 Ok(OpenCatalogResult {
606 catalog,
607 migrated_storage_collections_0dt,
608 new_builtin_collections,
609 builtin_table_updates,
610 cached_global_exprs,
611 uncached_local_exprs,
612 })
613 }
614 .instrument(tracing::info_span!("catalog::open"))
615 .boxed()
616 }
617
618 async fn initialize_storage_state(
625 &mut self,
626 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
627 ) -> Result<(), mz_catalog::durable::CatalogError> {
628 let collections = self
629 .entries()
630 .filter(|entry| entry.item().is_storage_collection())
631 .flat_map(|entry| entry.global_ids())
632 .collect();
633
634 let mut state = self.state.clone();
637
638 let mut storage = self.storage().await;
639 let shard_id = storage.shard_id();
640 let mut txn = storage.transaction().await?;
641
642 let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
645 let global_id = self.get_entry(&item_id).latest_global_id();
646 match txn.get_collection_metadata().get(&global_id) {
647 None => {
648 txn.insert_collection_metadata([(global_id, shard_id)].into())
649 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
650 }
651 Some(id) => assert_eq!(*id, shard_id),
652 }
653
654 storage_collections
655 .initialize_state(&mut txn, collections)
656 .await
657 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
658
659 let updates = txn.get_and_commit_op_updates();
660 let (builtin_updates, catalog_updates) = state
661 .apply_updates(updates, &mut LocalExpressionCache::Closed)
662 .await;
663 assert!(
664 builtin_updates.is_empty(),
665 "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
666 );
667 assert!(
668 catalog_updates.is_empty(),
669 "storage is not allowed to generate catalog changes that would change the catalog or controller state"
670 );
671 let commit_ts = txn.upper();
672 txn.commit(commit_ts).await?;
673 drop(storage);
674
675 self.state = state;
677 Ok(())
678 }
679
680 pub async fn initialize_controller(
683 &mut self,
684 config: mz_controller::ControllerConfig,
685 envd_epoch: core::num::NonZeroI64,
686 read_only: bool,
687 ) -> Result<mz_controller::Controller, mz_catalog::durable::CatalogError> {
688 let controller_start = Instant::now();
689 info!("startup: controller init: beginning");
690
691 let controller = {
692 let mut storage = self.storage().await;
693 let mut tx = storage.transaction().await?;
694 mz_controller::prepare_initialization(&mut tx)
695 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
696 let updates = tx.get_and_commit_op_updates();
697 assert!(
698 updates.is_empty(),
699 "initializing controller should not produce updates: {updates:?}"
700 );
701 let commit_ts = tx.upper();
702 tx.commit(commit_ts).await?;
703
704 let read_only_tx = storage.transaction().await?;
705
706 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
707 };
708
709 self.initialize_storage_state(&controller.storage_collections)
710 .await?;
711
712 info!(
713 "startup: controller init: complete in {:?}",
714 controller_start.elapsed()
715 );
716
717 Ok(controller)
718 }
719
720 pub async fn expire(self) {
722 if let Some(storage) = Arc::into_inner(self.storage) {
725 let storage = storage.into_inner();
726 storage.expire().await;
727 }
728 }
729}
730
731impl CatalogState {
732 fn set_system_configuration_default(
734 &mut self,
735 name: &str,
736 value: VarInput,
737 ) -> Result<(), Error> {
738 Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
739 }
740}
741
742fn add_new_remove_old_builtin_items_migration(
746 txn: &mut mz_catalog::durable::Transaction<'_>,
747) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
748 let mut new_builtin_mappings = Vec::new();
749 let mut builtin_descs = HashSet::new();
751
752 let mut builtins = Vec::new();
755 for builtin in BUILTINS::iter() {
756 let desc = SystemObjectDescription {
757 schema_name: builtin.schema().to_string(),
758 object_type: builtin.catalog_item_type(),
759 object_name: builtin.name().to_string(),
760 };
761 if !builtin_descs.insert(desc.clone()) {
763 panic!(
764 "duplicate builtin description: {:?}, {:?}",
765 SystemObjectDescription {
766 schema_name: builtin.schema().to_string(),
767 object_type: builtin.catalog_item_type(),
768 object_name: builtin.name().to_string(),
769 },
770 builtin
771 );
772 }
773 builtins.push((desc, builtin));
774 }
775
776 let mut system_object_mappings: BTreeMap<_, _> = txn
777 .get_system_object_mappings()
778 .map(|system_object_mapping| {
779 (
780 system_object_mapping.description.clone(),
781 system_object_mapping,
782 )
783 })
784 .collect();
785
786 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
787 builtins.into_iter().partition_map(|(desc, builtin)| {
788 let fingerprint = match builtin.runtime_alterable() {
789 false => builtin.fingerprint(),
790 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
791 };
792 match system_object_mappings.remove(&desc) {
793 Some(system_object_mapping) => {
794 Either::Left((builtin, system_object_mapping, fingerprint))
795 }
796 None => Either::Right((builtin, fingerprint)),
797 }
798 });
799 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
800 let new_builtins: Vec<_> = new_builtins
801 .into_iter()
802 .zip_eq(new_builtin_ids.clone())
803 .collect();
804
805 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
807 new_builtin_mappings.push(SystemObjectMapping {
808 description: SystemObjectDescription {
809 schema_name: builtin.schema().to_string(),
810 object_type: builtin.catalog_item_type(),
811 object_name: builtin.name().to_string(),
812 },
813 unique_identifier: SystemObjectUniqueIdentifier {
814 catalog_id,
815 global_id,
816 fingerprint,
817 },
818 });
819
820 let handled_runtime_alterable = match builtin {
826 Builtin::Connection(c) if c.runtime_alterable => {
827 let mut acl_items = vec![rbac::owner_privilege(
828 mz_sql::catalog::ObjectType::Connection,
829 c.owner_id.clone(),
830 )];
831 acl_items.extend_from_slice(c.access);
832 let versions = BTreeMap::new();
834
835 txn.insert_item(
836 catalog_id,
837 c.oid,
838 global_id,
839 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
840 c.name,
841 c.sql.into(),
842 *c.owner_id,
843 acl_items,
844 versions,
845 )?;
846 true
847 }
848 _ => false,
849 };
850 assert_eq!(
851 builtin.runtime_alterable(),
852 handled_runtime_alterable,
853 "runtime alterable object was not handled by migration",
854 );
855 }
856 txn.set_system_object_mappings(new_builtin_mappings)?;
857
858 let builtins_with_catalog_ids = existing_builtins
860 .iter()
861 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
862 .chain(
863 new_builtins
864 .into_iter()
865 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
866 );
867
868 for (builtin, id) in builtins_with_catalog_ids {
869 let (comment_id, desc, comments) = match builtin {
870 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
871 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
872 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
873 Builtin::MaterializedView(mv) => (
874 CommentObjectId::MaterializedView(id),
875 &mv.desc,
876 &mv.column_comments,
877 ),
878 Builtin::Log(_)
879 | Builtin::Type(_)
880 | Builtin::Func(_)
881 | Builtin::Index(_)
882 | Builtin::Connection(_) => continue,
883 };
884 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
885
886 let mut comments = comments.clone();
887 for (col_idx, name) in desc.iter_names().enumerate() {
888 if let Some(comment) = comments.remove(name.as_str()) {
889 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
891 }
892 }
893 assert!(
894 comments.is_empty(),
895 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
896 );
897 }
898
899 let mut deleted_system_objects = BTreeSet::new();
902 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
903 let mut deleted_comments = BTreeSet::new();
904 for (desc, mapping) in system_object_mappings {
905 deleted_system_objects.insert(mapping.description);
906 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
907 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
908 }
909
910 let id = mapping.unique_identifier.catalog_id;
911 let comment_id = match desc.object_type {
912 CatalogItemType::Table => CommentObjectId::Table(id),
913 CatalogItemType::Source => CommentObjectId::Source(id),
914 CatalogItemType::View => CommentObjectId::View(id),
915 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(id),
916 CatalogItemType::Sink
917 | CatalogItemType::Index
918 | CatalogItemType::Type
919 | CatalogItemType::Func
920 | CatalogItemType::Secret
921 | CatalogItemType::Connection => continue,
922 };
923 deleted_comments.insert(comment_id);
924 }
925 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
931 assert!(
935 deleted_system_objects
936 .iter()
937 .filter(|object| object.object_type != CatalogItemType::Index)
939 .all(
940 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
941 || delete_exceptions.contains(deleted_object)
942 ),
943 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
944 deleted_system_objects
945 );
946 txn.drop_comments(&deleted_comments)?;
947 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
948 txn.remove_system_object_mappings(deleted_system_objects)?;
949
950 let new_builtin_collections = new_builtin_ids
952 .into_iter()
953 .map(|(_catalog_id, global_id)| global_id)
954 .collect();
955
956 Ok(new_builtin_collections)
957}
958
959fn add_new_remove_old_builtin_clusters_migration(
960 txn: &mut mz_catalog::durable::Transaction<'_>,
961 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
962 boot_ts: Timestamp,
963) -> Result<(), mz_catalog::durable::CatalogError> {
964 let mut durable_clusters: BTreeMap<_, _> = txn
965 .get_clusters()
966 .filter(|cluster| cluster.id.is_system())
967 .map(|cluster| (cluster.name.to_string(), cluster))
968 .collect();
969
970 for builtin_cluster in BUILTIN_CLUSTERS {
972 if durable_clusters.remove(builtin_cluster.name).is_none() {
973 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
974
975 let cluster_id = txn.insert_system_cluster(
976 builtin_cluster.name,
977 vec![],
978 builtin_cluster.privileges.to_vec(),
979 builtin_cluster.owner_id.to_owned(),
980 mz_catalog::durable::ClusterConfig {
981 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
982 size: cluster_config.size,
983 availability_zones: vec![],
984 replication_factor: cluster_config.replication_factor,
985 logging: default_logging_config(),
986 optimizer_feature_overrides: Default::default(),
987 schedule: Default::default(),
988 }),
989 workload_class: None,
990 },
991 &HashSet::new(),
992 )?;
993
994 let audit_id = txn.allocate_audit_log_id()?;
995 txn.insert_audit_log_event(VersionedEvent::new(
996 audit_id,
997 EventType::Create,
998 ObjectType::Cluster,
999 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1000 id: cluster_id.to_string(),
1001 name: builtin_cluster.name.to_string(),
1002 }),
1003 None,
1004 boot_ts.into(),
1005 ));
1006 }
1007 }
1008
1009 let old_clusters = durable_clusters
1011 .values()
1012 .map(|cluster| cluster.id)
1013 .collect();
1014 txn.remove_clusters(&old_clusters)?;
1015
1016 for (_name, cluster) in &durable_clusters {
1017 let audit_id = txn.allocate_audit_log_id()?;
1018 txn.insert_audit_log_event(VersionedEvent::new(
1019 audit_id,
1020 EventType::Drop,
1021 ObjectType::Cluster,
1022 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1023 id: cluster.id.to_string(),
1024 name: cluster.name.clone(),
1025 }),
1026 None,
1027 boot_ts.into(),
1028 ));
1029 }
1030
1031 Ok(())
1032}
1033
1034fn add_new_remove_old_builtin_introspection_source_migration(
1035 txn: &mut mz_catalog::durable::Transaction<'_>,
1036) -> Result<(), AdapterError> {
1037 let mut new_indexes = Vec::new();
1038 let mut removed_indexes = BTreeSet::new();
1039 for cluster in txn.get_clusters() {
1040 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1041
1042 let mut new_logs = Vec::new();
1043
1044 for log in BUILTINS::logs() {
1045 if introspection_source_index_ids.remove(log.name).is_none() {
1046 new_logs.push(log);
1047 }
1048 }
1049
1050 for log in new_logs {
1051 let (item_id, gid) =
1052 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1053 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1054 }
1055
1056 removed_indexes.extend(
1059 introspection_source_index_ids
1060 .into_keys()
1061 .map(|name| (cluster.id, name.to_string())),
1062 );
1063 }
1064 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1065 txn.remove_introspection_source_indexes(removed_indexes)?;
1066 Ok(())
1067}
1068
1069fn add_new_remove_old_builtin_roles_migration(
1070 txn: &mut mz_catalog::durable::Transaction<'_>,
1071) -> Result<(), mz_catalog::durable::CatalogError> {
1072 let mut durable_roles: BTreeMap<_, _> = txn
1073 .get_roles()
1074 .filter(|role| role.id.is_system() || role.id.is_predefined())
1075 .map(|role| (role.name.to_string(), role))
1076 .collect();
1077
1078 for builtin_role in BUILTIN_ROLES {
1080 if durable_roles.remove(builtin_role.name).is_none() {
1081 txn.insert_builtin_role(
1082 builtin_role.id,
1083 builtin_role.name.to_string(),
1084 builtin_role.attributes.clone(),
1085 RoleMembership::new(),
1086 RoleVars::default(),
1087 builtin_role.oid,
1088 )?;
1089 }
1090 }
1091
1092 let old_roles = durable_roles.values().map(|role| role.id).collect();
1094 txn.remove_roles(&old_roles)?;
1095
1096 Ok(())
1097}
1098
1099fn add_new_remove_old_builtin_cluster_replicas_migration(
1100 txn: &mut Transaction<'_>,
1101 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1102 boot_ts: Timestamp,
1103) -> Result<(), AdapterError> {
1104 let cluster_lookup: BTreeMap<_, _> = txn
1105 .get_clusters()
1106 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1107 .collect();
1108
1109 let cluster_id_to_name: BTreeMap<ClusterId, String> = cluster_lookup
1110 .values()
1111 .map(|cluster| (cluster.id, cluster.name.clone()))
1112 .collect();
1113
1114 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1115 .get_cluster_replicas()
1116 .filter(|replica| replica.replica_id.is_system())
1117 .fold(BTreeMap::new(), |mut acc, replica| {
1118 acc.entry(replica.cluster_id)
1119 .or_insert_with(BTreeMap::new)
1120 .insert(replica.name.to_string(), replica);
1121 acc
1122 });
1123
1124 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1126 let cluster = cluster_lookup
1127 .get(builtin_replica.cluster_name)
1128 .expect("builtin cluster replica references non-existent cluster");
1129 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1131 let replica_names = durable_replicas
1132 .get_mut(&cluster.id)
1133 .unwrap_or(&mut empty_map);
1134
1135 let builtin_cluster_bootstrap_config =
1136 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1137 if replica_names.remove(builtin_replica.name).is_none()
1138 && builtin_cluster_bootstrap_config.replication_factor > 0
1142 {
1143 let replica_size = match cluster.config.variant {
1144 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1145 ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size.clone(),
1146 };
1147
1148 let config = builtin_cluster_replica_config(replica_size.clone());
1149 let replica_id = txn.insert_cluster_replica(
1150 cluster.id,
1151 builtin_replica.name,
1152 config,
1153 MZ_SYSTEM_ROLE_ID,
1154 )?;
1155
1156 let audit_id = txn.allocate_audit_log_id()?;
1157 txn.insert_audit_log_event(VersionedEvent::new(
1158 audit_id,
1159 EventType::Create,
1160 ObjectType::ClusterReplica,
1161 EventDetails::CreateClusterReplicaV4(mz_audit_log::CreateClusterReplicaV4 {
1162 cluster_id: cluster.id.to_string(),
1163 cluster_name: cluster.name.clone(),
1164 replica_id: Some(replica_id.to_string()),
1165 replica_name: builtin_replica.name.to_string(),
1166 logical_size: replica_size,
1167 billed_as: None,
1168 internal: false,
1169 reason: CreateOrDropClusterReplicaReasonV1::System,
1170 scheduling_policies: None,
1171 }),
1172 None,
1173 boot_ts.into(),
1174 ));
1175 }
1176 }
1177
1178 let old_replicas: Vec<_> = durable_replicas
1180 .values()
1181 .flat_map(|replicas| replicas.values())
1182 .collect();
1183 let old_replica_ids = old_replicas.iter().map(|r| r.replica_id).collect();
1184 txn.remove_cluster_replicas(&old_replica_ids)?;
1185
1186 for replica in &old_replicas {
1187 let cluster_name = cluster_id_to_name
1188 .get(&replica.cluster_id)
1189 .cloned()
1190 .unwrap_or_else(|| "<unknown>".to_string());
1191
1192 let audit_id = txn.allocate_audit_log_id()?;
1193 txn.insert_audit_log_event(VersionedEvent::new(
1194 audit_id,
1195 EventType::Drop,
1196 ObjectType::ClusterReplica,
1197 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1198 cluster_id: replica.cluster_id.to_string(),
1199 cluster_name,
1200 replica_id: Some(replica.replica_id.to_string()),
1201 replica_name: replica.name.clone(),
1202 reason: CreateOrDropClusterReplicaReasonV1::System,
1203 scheduling_policies: None,
1204 }),
1205 None,
1206 boot_ts.into(),
1207 ));
1208 }
1209
1210 Ok(())
1211}
1212
1213fn remove_invalid_config_param_role_defaults_migration(
1220 txn: &mut Transaction<'_>,
1221) -> Result<(), AdapterError> {
1222 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1223
1224 let roles_to_migrate: BTreeMap<_, _> = txn
1225 .get_roles()
1226 .filter_map(|mut role| {
1227 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1232
1233 let mut invalid_roles_vars = BTreeMap::new();
1235 for (name, value) in &role.vars.map {
1236 let Ok(session_var) = session_vars.inspect(name) else {
1238 invalid_roles_vars.insert(name.clone(), value.clone());
1239 continue;
1240 };
1241 if session_var.check(value.borrow()).is_err() {
1242 invalid_roles_vars.insert(name.clone(), value.clone());
1243 }
1244 }
1245
1246 if invalid_roles_vars.is_empty() {
1248 return None;
1249 }
1250
1251 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1252
1253 for (name, _value) in invalid_roles_vars {
1255 role.vars.map.remove(&name);
1256 }
1257 Some(role)
1258 })
1259 .map(|role| (role.id, role))
1260 .collect();
1261
1262 txn.update_roles_without_auth(roles_to_migrate)?;
1263
1264 Ok(())
1265}
1266
1267fn remove_pending_cluster_replicas_migration(
1270 tx: &mut Transaction,
1271 boot_ts: mz_repr::Timestamp,
1272) -> Result<(), anyhow::Error> {
1273 let cluster_names: BTreeMap<_, _> = tx.get_clusters().map(|c| (c.id, c.name)).collect();
1275
1276 let occurred_at = boot_ts.into();
1277
1278 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1279 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1280 replica.config.location
1281 {
1282 let cluster_name = cluster_names
1283 .get(&replica.cluster_id)
1284 .cloned()
1285 .unwrap_or_else(|| "<unknown>".to_string());
1286
1287 info!(
1288 "removing pending cluster replica '{}' from cluster '{}'",
1289 replica.name, cluster_name,
1290 );
1291
1292 tx.remove_cluster_replica(replica.replica_id)?;
1293
1294 let audit_id = tx.allocate_audit_log_id()?;
1298 tx.insert_audit_log_event(VersionedEvent::new(
1299 audit_id,
1300 EventType::Drop,
1301 ObjectType::ClusterReplica,
1302 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1303 cluster_id: replica.cluster_id.to_string(),
1304 cluster_name,
1305 replica_id: Some(replica.replica_id.to_string()),
1306 replica_name: replica.name,
1307 reason: CreateOrDropClusterReplicaReasonV1::System,
1308 scheduling_policies: None,
1309 }),
1310 None,
1311 occurred_at,
1312 ));
1313 }
1314 }
1315 Ok(())
1316}
1317
1318pub(crate) fn builtin_cluster_replica_config(
1319 replica_size: String,
1320) -> mz_catalog::durable::ReplicaConfig {
1321 mz_catalog::durable::ReplicaConfig {
1322 location: mz_catalog::durable::ReplicaLocation::Managed {
1323 availability_zone: None,
1324 billed_as: None,
1325 pending: false,
1326 internal: false,
1327 size: replica_size,
1328 },
1329 logging: default_logging_config(),
1330 }
1331}
1332
1333fn default_logging_config() -> ReplicaLogging {
1334 ReplicaLogging {
1335 log_logging: false,
1336 interval: Some(Duration::from_secs(1)),
1337 }
1338}
1339
1340#[derive(Debug)]
1341pub struct BuiltinBootstrapClusterConfigMap {
1342 pub system_cluster: BootstrapBuiltinClusterConfig,
1344 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1346 pub probe_cluster: BootstrapBuiltinClusterConfig,
1348 pub support_cluster: BootstrapBuiltinClusterConfig,
1350 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1352}
1353
1354impl BuiltinBootstrapClusterConfigMap {
1355 fn get_config(
1357 &self,
1358 cluster_name: &str,
1359 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1360 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1361 &self.system_cluster
1362 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1363 &self.catalog_server_cluster
1364 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1365 &self.probe_cluster
1366 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1367 &self.support_cluster
1368 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1369 &self.analytics_cluster
1370 } else {
1371 return Err(mz_catalog::durable::CatalogError::Catalog(
1372 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1373 ));
1374 };
1375 Ok(cluster_config.clone())
1376 }
1377}
1378
1379pub(crate) fn into_consolidatable_updates_startup(
1396 updates: Vec<StateUpdate>,
1397 ts: Timestamp,
1398) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1399 updates
1400 .into_iter()
1401 .map(|StateUpdate { kind, ts: _, diff }| {
1402 let kind: BootstrapStateUpdateKind = kind
1403 .try_into()
1404 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1405 (kind, ts, Diff::from(diff))
1406 })
1407 .collect()
1408}