1mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
23use mz_auth::hash::scram256_hash;
24use mz_catalog::SYSTEM_CONN_ID;
25use mz_catalog::builtin::{
26 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
27 Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
28};
29use mz_catalog::config::StateConfig;
30use mz_catalog::durable::objects::{
31 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
32};
33use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
34use mz_catalog::expr_cache::{
35 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
36};
37use mz_catalog::memory::error::{Error, ErrorKind};
38use mz_catalog::memory::objects::{
39 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
40};
41use mz_controller::clusters::ReplicaLogging;
42use mz_controller_types::ClusterId;
43use mz_ore::cast::usize_to_u64;
44use mz_ore::collections::HashSet;
45use mz_ore::now::{SYSTEM_TIME, to_datetime};
46use mz_ore::{instrument, soft_assert_no_log};
47use mz_repr::adt::mz_acl_item::PrivilegeMap;
48use mz_repr::namespaces::is_unstable_schema;
49use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
50use mz_sql::catalog::{
51 BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
52};
53use mz_sql::func::OP_IMPLS;
54use mz_sql::names::CommentObjectId;
55use mz_sql::rbac;
56use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
57use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
58use mz_storage_client::controller::{StorageMetadata, StorageTxn};
59use mz_storage_client::storage_collections::StorageCollections;
60use tracing::{Instrument, info, warn};
61use uuid::Uuid;
62
63use crate::AdapterError;
65use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68 BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name,
69};
70
71pub struct InitializeStateResult {
72 pub state: CatalogState,
74 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
76 pub new_builtin_collections: BTreeSet<GlobalId>,
78 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
80 pub last_seen_version: String,
82 pub expr_cache_handle: Option<ExpressionCacheHandle>,
84 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
86 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
88}
89
90pub struct OpenCatalogResult {
91 pub catalog: Catalog,
93 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
95 pub new_builtin_collections: BTreeSet<GlobalId>,
97 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
99 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
101 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
103}
104
105impl Catalog {
106 pub async fn initialize_state<'a>(
110 config: StateConfig,
111 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
112 ) -> Result<InitializeStateResult, AdapterError> {
113 for builtin_role in BUILTIN_ROLES {
114 assert!(
115 is_reserved_name(builtin_role.name),
116 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
117 BUILTIN_PREFIXES.join(", ")
118 );
119 }
120 for builtin_cluster in BUILTIN_CLUSTERS {
121 assert!(
122 is_reserved_name(builtin_cluster.name),
123 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
124 BUILTIN_PREFIXES.join(", ")
125 );
126 }
127
128 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
129 if config.all_features {
130 system_configuration.enable_all_feature_flags_by_default();
131 }
132
133 let mut state = CatalogState {
134 database_by_name: imbl::OrdMap::new(),
135 database_by_id: imbl::OrdMap::new(),
136 entry_by_id: imbl::OrdMap::new(),
137 entry_by_global_id: imbl::OrdMap::new(),
138 ambient_schemas_by_name: imbl::OrdMap::new(),
139 ambient_schemas_by_id: imbl::OrdMap::new(),
140 clusters_by_name: imbl::OrdMap::new(),
141 clusters_by_id: imbl::OrdMap::new(),
142 roles_by_name: imbl::OrdMap::new(),
143 roles_by_id: imbl::OrdMap::new(),
144 network_policies_by_id: imbl::OrdMap::new(),
145 role_auth_by_id: imbl::OrdMap::new(),
146 network_policies_by_name: imbl::OrdMap::new(),
147 system_configuration: Arc::new(system_configuration),
148 default_privileges: Arc::new(DefaultPrivileges::default()),
149 system_privileges: Arc::new(PrivilegeMap::default()),
150 comments: Arc::new(CommentsMap::default()),
151 source_references: imbl::OrdMap::new(),
152 storage_metadata: Arc::new(StorageMetadata::default()),
153 temporary_schemas: imbl::OrdMap::new(),
154 mock_authentication_nonce: Default::default(),
155 config: mz_sql::catalog::CatalogConfig {
156 start_time: to_datetime((config.now)()),
157 start_instant: Instant::now(),
158 nonce: rand::random(),
159 environment_id: config.environment_id,
160 session_id: Uuid::new_v4(),
161 build_info: config.build_info,
162 now: config.now.clone(),
163 connection_context: config.connection_context,
164 builtins_cfg: BuiltinsConfig {
165 include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
169 &config.system_parameter_defaults,
170 config.remote_system_parameters.as_ref(),
171 &ENABLE_CONTINUAL_TASK_BUILTINS,
172 ),
173 },
174 helm_chart_version: config.helm_chart_version,
175 },
176 cluster_replica_sizes: config.cluster_replica_sizes,
177 availability_zones: config.availability_zones,
178 egress_addresses: config.egress_addresses,
179 aws_principal_context: config.aws_principal_context,
180 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
181 http_host_name: config.http_host_name,
182 license_key: config.license_key,
183 };
184
185 let deploy_generation = storage.get_deployment_generation().await?;
186
187 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
188 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
189 let mut txn = storage.transaction().await?;
190
191 let new_builtin_collections = {
193 migrate::durable_migrate(
194 &mut txn,
195 state.config.environment_id.organization_id(),
196 config.boot_ts,
197 )?;
198 if let Some(remote_system_parameters) = config.remote_system_parameters {
201 for (name, value) in remote_system_parameters {
202 txn.upsert_system_config(&name, value)?;
203 }
204 txn.set_system_config_synced_once()?;
205 }
206 let new_builtin_collections =
208 add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
209 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
210 system_cluster: config.builtin_system_cluster_config,
211 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
212 probe_cluster: config.builtin_probe_cluster_config,
213 support_cluster: config.builtin_support_cluster_config,
214 analytics_cluster: config.builtin_analytics_cluster_config,
215 };
216 add_new_remove_old_builtin_clusters_migration(
217 &mut txn,
218 &builtin_bootstrap_cluster_config_map,
219 )?;
220 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
221 add_new_remove_old_builtin_cluster_replicas_migration(
222 &mut txn,
223 &builtin_bootstrap_cluster_config_map,
224 )?;
225 add_new_remove_old_builtin_roles_migration(&mut txn)?;
226 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
227 remove_pending_cluster_replicas_migration(&mut txn)?;
228
229 new_builtin_collections
230 };
231
232 let op_updates = txn.get_and_commit_op_updates();
233 updates.extend(op_updates);
234
235 let mut builtin_table_updates = Vec::new();
236
237 {
239 for (name, value) in config.system_parameter_defaults {
242 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
243 Ok(_) => (),
244 Err(Error {
245 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
246 }) => {
247 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
248 }
249 Err(e) => return Err(e.into()),
250 };
251 }
252 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
253 }
254
255 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
258 differential_dataflow::consolidation::consolidate_updates(&mut updates);
259 soft_assert_no_log!(
260 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
261 "consolidated updates should be positive during startup: {updates:?}"
262 );
263
264 let mut pre_item_updates = Vec::new();
265 let mut system_item_updates = Vec::new();
266 let mut item_updates = Vec::new();
267 let mut post_item_updates = Vec::new();
268 let mut audit_log_updates = Vec::new();
269 for (kind, ts, diff) in updates {
270 match kind {
271 BootstrapStateUpdateKind::Role(_)
272 | BootstrapStateUpdateKind::RoleAuth(_)
273 | BootstrapStateUpdateKind::Database(_)
274 | BootstrapStateUpdateKind::Schema(_)
275 | BootstrapStateUpdateKind::DefaultPrivilege(_)
276 | BootstrapStateUpdateKind::SystemPrivilege(_)
277 | BootstrapStateUpdateKind::SystemConfiguration(_)
278 | BootstrapStateUpdateKind::Cluster(_)
279 | BootstrapStateUpdateKind::NetworkPolicy(_)
280 | BootstrapStateUpdateKind::ClusterReplica(_) => {
281 pre_item_updates.push(StateUpdate {
282 kind: kind.into(),
283 ts,
284 diff: diff.try_into().expect("valid diff"),
285 })
286 }
287 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
288 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
289 system_item_updates.push(StateUpdate {
290 kind: kind.into(),
291 ts,
292 diff: diff.try_into().expect("valid diff"),
293 })
294 }
295 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
296 kind: kind.into(),
297 ts,
298 diff: diff.try_into().expect("valid diff"),
299 }),
300 BootstrapStateUpdateKind::Comment(_)
301 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
302 | BootstrapStateUpdateKind::SourceReferences(_)
303 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
304 post_item_updates.push((kind, ts, diff));
305 }
306 BootstrapStateUpdateKind::AuditLog(_) => {
307 audit_log_updates.push(StateUpdate {
308 kind: kind.into(),
309 ts,
310 diff: diff.try_into().expect("valid diff"),
311 });
312 }
313 }
314 }
315
316 let (builtin_table_update, _catalog_updates) = state
317 .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
318 .await;
319 builtin_table_updates.extend(builtin_table_update);
320
321 {
325 if let Some(password) = config.external_login_password_mz_system {
326 let role_auth = RoleAuth {
327 role_id: MZ_SYSTEM_ROLE_ID,
328 password_hash: Some(
331 scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
332 .map_err(|_| {
333 AdapterError::Internal("Failed to hash mz_system password.".to_owned())
334 })?,
335 ),
336 updated_at: SYSTEM_TIME(),
337 };
338 state
339 .role_auth_by_id
340 .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
341 let builtin_table_update = state.generate_builtin_table_update(
342 mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
343 mz_catalog::memory::objects::StateDiff::Addition,
344 );
345 builtin_table_updates.extend(builtin_table_update);
346 }
347 }
348
349 let expr_cache_start = Instant::now();
350 info!("startup: coordinator init: catalog open: expr cache open beginning");
351 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
354 let expr_cache_enabled = config
355 .enable_expression_cache_override
356 .unwrap_or(enable_expr_cache_dyncfg);
357 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
358 info!(
359 ?config.enable_expression_cache_override,
360 ?enable_expr_cache_dyncfg,
361 "using expression cache for startup"
362 );
363 let current_ids = txn
364 .get_items()
365 .flat_map(|item| {
366 let gid = item.global_id.clone();
367 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
368 std::iter::once(gid).chain(gids.into_iter())
369 })
370 .chain(
371 txn.get_system_object_mappings()
372 .map(|som| som.unique_identifier.global_id),
373 )
374 .collect();
375 let dyncfgs = config.persist_client.dyncfgs().clone();
376 let build_version = if config.build_info.is_dev() {
377 config
380 .build_info
381 .semver_version_build()
382 .expect("build ID is not available on your platform!")
383 } else {
384 config.build_info.semver_version()
385 };
386 let expr_cache_config = ExpressionCacheConfig {
387 build_version,
388 shard_id: txn
389 .get_expression_cache_shard()
390 .expect("expression cache shard should exist for opened catalogs"),
391 persist: config.persist_client,
392 current_ids,
393 remove_prior_versions: !config.read_only,
394 compact_shard: config.read_only,
395 dyncfgs,
396 };
397 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
398 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
399 (
400 Some(expr_cache_handle),
401 cached_local_exprs,
402 cached_global_exprs,
403 )
404 } else {
405 (None, BTreeMap::new(), BTreeMap::new())
406 };
407 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
408 info!(
409 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
410 expr_cache_start.elapsed()
411 );
412
413 let (builtin_table_update, _catalog_updates) = state
419 .apply_updates(system_item_updates, &mut local_expr_cache)
420 .await;
421 builtin_table_updates.extend(builtin_table_update);
422
423 let last_seen_version =
424 get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
425
426 let mz_authentication_mock_nonce =
427 txn.get_authentication_mock_nonce().ok_or_else(|| {
428 Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
429 })?;
430
431 state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
432
433 let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
435 let migrate_result = migrate::migrate(
436 &mut state,
437 &mut txn,
438 &mut local_expr_cache,
439 item_updates,
440 config.now,
441 config.boot_ts,
442 )
443 .await
444 .map_err(|e| {
445 Error::new(ErrorKind::FailedCatalogMigration {
446 last_seen_version: last_seen_version.clone(),
447 this_version: config.build_info.version,
448 cause: e.to_string(),
449 })
450 })?;
451 if !migrate_result.post_item_updates.is_empty() {
452 post_item_updates.extend(migrate_result.post_item_updates);
455 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
457 for (_, ts, _) in &mut post_item_updates {
458 *ts = max_ts;
459 }
460 }
461 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
462 }
463
464 (
465 migrate_result.builtin_table_updates,
466 migrate_result.catalog_updates,
467 )
468 } else {
469 state
470 .apply_updates(item_updates, &mut local_expr_cache)
471 .await
472 };
473 builtin_table_updates.extend(builtin_table_update);
474
475 let post_item_updates = post_item_updates
476 .into_iter()
477 .map(|(kind, ts, diff)| StateUpdate {
478 kind: kind.into(),
479 ts,
480 diff: diff.try_into().expect("valid diff"),
481 })
482 .collect();
483 let (builtin_table_update, _catalog_updates) = state
484 .apply_updates(post_item_updates, &mut local_expr_cache)
485 .await;
486 builtin_table_updates.extend(builtin_table_update);
487
488 for audit_log_update in audit_log_updates {
492 builtin_table_updates.extend(
493 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
494 );
495 }
496
497 let schema_migration_result = builtin_schema_migration::run(
499 config.build_info,
500 deploy_generation,
501 &mut txn,
502 config.builtin_item_migration_config,
503 )
504 .await?;
505
506 let state_updates = txn.get_and_commit_op_updates();
507
508 let (table_updates, _catalog_updates) = state
514 .apply_updates(state_updates, &mut local_expr_cache)
515 .await;
516 builtin_table_updates.extend(table_updates);
517 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
518
519 set_migration_version(&mut txn, config.build_info.semver_version())?;
521
522 txn.commit(config.boot_ts).await?;
523
524 schema_migration_result.cleanup_action.await;
526
527 Ok(InitializeStateResult {
528 state,
529 migrated_storage_collections_0dt: schema_migration_result.replaced_items,
530 new_builtin_collections: new_builtin_collections.into_iter().collect(),
531 builtin_table_updates,
532 last_seen_version,
533 expr_cache_handle,
534 cached_global_exprs,
535 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
536 })
537 }
538
539 #[instrument(name = "catalog::open")]
550 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
551 async move {
552 let mut storage = config.storage;
553
554 let InitializeStateResult {
555 state,
556 migrated_storage_collections_0dt,
557 new_builtin_collections,
558 mut builtin_table_updates,
559 last_seen_version: _,
560 expr_cache_handle,
561 cached_global_exprs,
562 uncached_local_exprs,
563 } =
564 Self::initialize_state(config.state, &mut storage)
568 .instrument(tracing::info_span!("catalog::initialize_state"))
569 .boxed()
570 .await?;
571
572 let catalog = Catalog {
573 state,
574 plans: CatalogPlans::default(),
575 expr_cache_handle,
576 transient_revision: 1,
577 storage: Arc::new(tokio::sync::Mutex::new(storage)),
578 };
579
580 for (op, func) in OP_IMPLS.iter() {
583 match func {
584 mz_sql::func::Func::Scalar(impls) => {
585 for imp in impls {
586 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
587 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
588 ));
589 }
590 }
591 _ => unreachable!("all operators must be scalar functions"),
592 }
593 }
594
595 for ip in &catalog.state.egress_addresses {
596 builtin_table_updates.push(
597 catalog
598 .state
599 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
600 );
601 }
602
603 if !catalog.state.license_key.id.is_empty() {
604 builtin_table_updates.push(
605 catalog.state.resolve_builtin_table_update(
606 catalog
607 .state
608 .pack_license_key_update(&catalog.state.license_key)?,
609 ),
610 );
611 }
612
613 catalog.storage().await.mark_bootstrap_complete().await;
614
615 Ok(OpenCatalogResult {
616 catalog,
617 migrated_storage_collections_0dt,
618 new_builtin_collections,
619 builtin_table_updates,
620 cached_global_exprs,
621 uncached_local_exprs,
622 })
623 }
624 .instrument(tracing::info_span!("catalog::open"))
625 .boxed()
626 }
627
628 async fn initialize_storage_state(
635 &mut self,
636 storage_collections: &Arc<
637 dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
638 >,
639 ) -> Result<(), mz_catalog::durable::CatalogError> {
640 let collections = self
641 .entries()
642 .filter(|entry| entry.item().is_storage_collection())
643 .flat_map(|entry| entry.global_ids())
644 .collect();
645
646 let mut state = self.state.clone();
649
650 let mut storage = self.storage().await;
651 let shard_id = storage.shard_id();
652 let mut txn = storage.transaction().await?;
653
654 let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
657 let global_id = self.get_entry(&item_id).latest_global_id();
658 match txn.get_collection_metadata().get(&global_id) {
659 None => {
660 txn.insert_collection_metadata([(global_id, shard_id)].into())
661 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
662 }
663 Some(id) => assert_eq!(*id, shard_id),
664 }
665
666 storage_collections
667 .initialize_state(&mut txn, collections)
668 .await
669 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
670
671 let updates = txn.get_and_commit_op_updates();
672 let (builtin_updates, catalog_updates) = state
673 .apply_updates(updates, &mut LocalExpressionCache::Closed)
674 .await;
675 assert!(
676 builtin_updates.is_empty(),
677 "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
678 );
679 assert!(
680 catalog_updates.is_empty(),
681 "storage is not allowed to generate catalog changes that would change the catalog or controller state"
682 );
683 let commit_ts = txn.upper();
684 txn.commit(commit_ts).await?;
685 drop(storage);
686
687 self.state = state;
689 Ok(())
690 }
691
692 pub async fn initialize_controller(
695 &mut self,
696 config: mz_controller::ControllerConfig,
697 envd_epoch: core::num::NonZeroI64,
698 read_only: bool,
699 ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
700 {
701 let controller_start = Instant::now();
702 info!("startup: controller init: beginning");
703
704 let controller = {
705 let mut storage = self.storage().await;
706 let mut tx = storage.transaction().await?;
707 mz_controller::prepare_initialization(&mut tx)
708 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
709 let updates = tx.get_and_commit_op_updates();
710 assert!(
711 updates.is_empty(),
712 "initializing controller should not produce updates: {updates:?}"
713 );
714 let commit_ts = tx.upper();
715 tx.commit(commit_ts).await?;
716
717 let read_only_tx = storage.transaction().await?;
718
719 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
720 };
721
722 self.initialize_storage_state(&controller.storage_collections)
723 .await?;
724
725 info!(
726 "startup: controller init: complete in {:?}",
727 controller_start.elapsed()
728 );
729
730 Ok(controller)
731 }
732
733 pub async fn expire(self) {
735 if let Some(storage) = Arc::into_inner(self.storage) {
738 let storage = storage.into_inner();
739 storage.expire().await;
740 }
741 }
742}
743
744impl CatalogState {
745 fn set_system_configuration_default(
747 &mut self,
748 name: &str,
749 value: VarInput,
750 ) -> Result<(), Error> {
751 Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
752 }
753}
754
755fn add_new_remove_old_builtin_items_migration(
759 builtins_cfg: &BuiltinsConfig,
760 txn: &mut mz_catalog::durable::Transaction<'_>,
761) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
762 let mut new_builtin_mappings = Vec::new();
763 let mut builtin_descs = HashSet::new();
765
766 let mut builtins = Vec::new();
769 for builtin in BUILTINS::iter(builtins_cfg) {
770 let desc = SystemObjectDescription {
771 schema_name: builtin.schema().to_string(),
772 object_type: builtin.catalog_item_type(),
773 object_name: builtin.name().to_string(),
774 };
775 if !builtin_descs.insert(desc.clone()) {
777 panic!(
778 "duplicate builtin description: {:?}, {:?}",
779 SystemObjectDescription {
780 schema_name: builtin.schema().to_string(),
781 object_type: builtin.catalog_item_type(),
782 object_name: builtin.name().to_string(),
783 },
784 builtin
785 );
786 }
787 builtins.push((desc, builtin));
788 }
789
790 let mut system_object_mappings: BTreeMap<_, _> = txn
791 .get_system_object_mappings()
792 .map(|system_object_mapping| {
793 (
794 system_object_mapping.description.clone(),
795 system_object_mapping,
796 )
797 })
798 .collect();
799
800 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
801 builtins.into_iter().partition_map(|(desc, builtin)| {
802 let fingerprint = match builtin.runtime_alterable() {
803 false => builtin.fingerprint(),
804 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
805 };
806 match system_object_mappings.remove(&desc) {
807 Some(system_object_mapping) => {
808 Either::Left((builtin, system_object_mapping, fingerprint))
809 }
810 None => Either::Right((builtin, fingerprint)),
811 }
812 });
813 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
814 let new_builtins: Vec<_> = new_builtins
815 .into_iter()
816 .zip_eq(new_builtin_ids.clone())
817 .collect();
818
819 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
821 new_builtin_mappings.push(SystemObjectMapping {
822 description: SystemObjectDescription {
823 schema_name: builtin.schema().to_string(),
824 object_type: builtin.catalog_item_type(),
825 object_name: builtin.name().to_string(),
826 },
827 unique_identifier: SystemObjectUniqueIdentifier {
828 catalog_id,
829 global_id,
830 fingerprint,
831 },
832 });
833
834 let handled_runtime_alterable = match builtin {
840 Builtin::Connection(c) if c.runtime_alterable => {
841 let mut acl_items = vec![rbac::owner_privilege(
842 mz_sql::catalog::ObjectType::Connection,
843 c.owner_id.clone(),
844 )];
845 acl_items.extend_from_slice(c.access);
846 let versions = BTreeMap::new();
848
849 txn.insert_item(
850 catalog_id,
851 c.oid,
852 global_id,
853 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
854 c.name,
855 c.sql.into(),
856 *c.owner_id,
857 acl_items,
858 versions,
859 )?;
860 true
861 }
862 _ => false,
863 };
864 assert_eq!(
865 builtin.runtime_alterable(),
866 handled_runtime_alterable,
867 "runtime alterable object was not handled by migration",
868 );
869 }
870 txn.set_system_object_mappings(new_builtin_mappings)?;
871
872 let builtins_with_catalog_ids = existing_builtins
874 .iter()
875 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
876 .chain(
877 new_builtins
878 .into_iter()
879 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
880 );
881
882 for (builtin, id) in builtins_with_catalog_ids {
883 let (comment_id, desc, comments) = match builtin {
884 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
885 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
886 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
887 Builtin::Log(_)
888 | Builtin::Type(_)
889 | Builtin::Func(_)
890 | Builtin::ContinualTask(_)
891 | Builtin::Index(_)
892 | Builtin::Connection(_) => continue,
893 };
894 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
895
896 let mut comments = comments.clone();
897 for (col_idx, name) in desc.iter_names().enumerate() {
898 if let Some(comment) = comments.remove(name.as_str()) {
899 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
901 }
902 }
903 assert!(
904 comments.is_empty(),
905 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
906 );
907 }
908
909 let mut deleted_system_objects = BTreeSet::new();
912 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
913 let mut deleted_comments = BTreeSet::new();
914 for (desc, mapping) in system_object_mappings {
915 deleted_system_objects.insert(mapping.description);
916 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
917 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
918 }
919
920 let id = mapping.unique_identifier.catalog_id;
921 let comment_id = match desc.object_type {
922 CatalogItemType::Table => CommentObjectId::Table(id),
923 CatalogItemType::Source => CommentObjectId::Source(id),
924 CatalogItemType::View => CommentObjectId::View(id),
925 CatalogItemType::Sink
926 | CatalogItemType::MaterializedView
927 | CatalogItemType::Index
928 | CatalogItemType::Type
929 | CatalogItemType::Func
930 | CatalogItemType::Secret
931 | CatalogItemType::Connection
932 | CatalogItemType::ContinualTask => continue,
933 };
934 deleted_comments.insert(comment_id);
935 }
936 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
942 assert!(
946 deleted_system_objects
947 .iter()
948 .filter(|object| object.object_type != CatalogItemType::Index)
950 .all(
951 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
952 || delete_exceptions.contains(deleted_object)
953 ),
954 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
955 deleted_system_objects
956 );
957 txn.drop_comments(&deleted_comments)?;
958 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
959 txn.remove_system_object_mappings(deleted_system_objects)?;
960
961 let new_builtin_collections = new_builtin_ids
963 .into_iter()
964 .map(|(_catalog_id, global_id)| global_id)
965 .collect();
966
967 Ok(new_builtin_collections)
968}
969
970fn add_new_remove_old_builtin_clusters_migration(
971 txn: &mut mz_catalog::durable::Transaction<'_>,
972 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
973) -> Result<(), mz_catalog::durable::CatalogError> {
974 let mut durable_clusters: BTreeMap<_, _> = txn
975 .get_clusters()
976 .filter(|cluster| cluster.id.is_system())
977 .map(|cluster| (cluster.name.to_string(), cluster))
978 .collect();
979
980 for builtin_cluster in BUILTIN_CLUSTERS {
982 if durable_clusters.remove(builtin_cluster.name).is_none() {
983 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
984
985 txn.insert_system_cluster(
986 builtin_cluster.name,
987 vec![],
988 builtin_cluster.privileges.to_vec(),
989 builtin_cluster.owner_id.to_owned(),
990 mz_catalog::durable::ClusterConfig {
991 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
992 size: cluster_config.size,
993 availability_zones: vec![],
994 replication_factor: cluster_config.replication_factor,
995 logging: default_logging_config(),
996 optimizer_feature_overrides: Default::default(),
997 schedule: Default::default(),
998 }),
999 workload_class: None,
1000 },
1001 &HashSet::new(),
1002 )?;
1003 }
1004 }
1005
1006 let old_clusters = durable_clusters
1008 .values()
1009 .map(|cluster| cluster.id)
1010 .collect();
1011 txn.remove_clusters(&old_clusters)?;
1012
1013 Ok(())
1014}
1015
1016fn add_new_remove_old_builtin_introspection_source_migration(
1017 txn: &mut mz_catalog::durable::Transaction<'_>,
1018) -> Result<(), AdapterError> {
1019 let mut new_indexes = Vec::new();
1020 let mut removed_indexes = BTreeSet::new();
1021 for cluster in txn.get_clusters() {
1022 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1023
1024 let mut new_logs = Vec::new();
1025
1026 for log in BUILTINS::logs() {
1027 if introspection_source_index_ids.remove(log.name).is_none() {
1028 new_logs.push(log);
1029 }
1030 }
1031
1032 for log in new_logs {
1033 let (item_id, gid) =
1034 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1035 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1036 }
1037
1038 removed_indexes.extend(
1041 introspection_source_index_ids
1042 .into_keys()
1043 .map(|name| (cluster.id, name.to_string())),
1044 );
1045 }
1046 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1047 txn.remove_introspection_source_indexes(removed_indexes)?;
1048 Ok(())
1049}
1050
1051fn add_new_remove_old_builtin_roles_migration(
1052 txn: &mut mz_catalog::durable::Transaction<'_>,
1053) -> Result<(), mz_catalog::durable::CatalogError> {
1054 let mut durable_roles: BTreeMap<_, _> = txn
1055 .get_roles()
1056 .filter(|role| role.id.is_system() || role.id.is_predefined())
1057 .map(|role| (role.name.to_string(), role))
1058 .collect();
1059
1060 for builtin_role in BUILTIN_ROLES {
1062 if durable_roles.remove(builtin_role.name).is_none() {
1063 txn.insert_builtin_role(
1064 builtin_role.id,
1065 builtin_role.name.to_string(),
1066 builtin_role.attributes.clone(),
1067 RoleMembership::new(),
1068 RoleVars::default(),
1069 builtin_role.oid,
1070 )?;
1071 }
1072 }
1073
1074 let old_roles = durable_roles.values().map(|role| role.id).collect();
1076 txn.remove_roles(&old_roles)?;
1077
1078 Ok(())
1079}
1080
1081fn add_new_remove_old_builtin_cluster_replicas_migration(
1082 txn: &mut Transaction<'_>,
1083 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1084) -> Result<(), AdapterError> {
1085 let cluster_lookup: BTreeMap<_, _> = txn
1086 .get_clusters()
1087 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1088 .collect();
1089
1090 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1091 .get_cluster_replicas()
1092 .filter(|replica| replica.replica_id.is_system())
1093 .fold(BTreeMap::new(), |mut acc, replica| {
1094 acc.entry(replica.cluster_id)
1095 .or_insert_with(BTreeMap::new)
1096 .insert(replica.name.to_string(), replica);
1097 acc
1098 });
1099
1100 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1102 let cluster = cluster_lookup
1103 .get(builtin_replica.cluster_name)
1104 .expect("builtin cluster replica references non-existent cluster");
1105 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1107 let replica_names = durable_replicas
1108 .get_mut(&cluster.id)
1109 .unwrap_or(&mut empty_map);
1110
1111 let builtin_cluster_bootstrap_config =
1112 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1113 if replica_names.remove(builtin_replica.name).is_none()
1114 && builtin_cluster_bootstrap_config.replication_factor > 0
1118 {
1119 let replica_size = match cluster.config.variant {
1120 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1121 ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size,
1122 };
1123
1124 let config = builtin_cluster_replica_config(replica_size);
1125 txn.insert_cluster_replica(
1126 cluster.id,
1127 builtin_replica.name,
1128 config,
1129 MZ_SYSTEM_ROLE_ID,
1130 )?;
1131 }
1132 }
1133
1134 let old_replicas = durable_replicas
1136 .values()
1137 .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1138 .collect();
1139 txn.remove_cluster_replicas(&old_replicas)?;
1140
1141 Ok(())
1142}
1143
1144fn remove_invalid_config_param_role_defaults_migration(
1151 txn: &mut Transaction<'_>,
1152) -> Result<(), AdapterError> {
1153 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1154
1155 let roles_to_migrate: BTreeMap<_, _> = txn
1156 .get_roles()
1157 .filter_map(|mut role| {
1158 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1163
1164 let mut invalid_roles_vars = BTreeMap::new();
1166 for (name, value) in &role.vars.map {
1167 let Ok(session_var) = session_vars.inspect(name) else {
1169 invalid_roles_vars.insert(name.clone(), value.clone());
1170 continue;
1171 };
1172 if session_var.check(value.borrow()).is_err() {
1173 invalid_roles_vars.insert(name.clone(), value.clone());
1174 }
1175 }
1176
1177 if invalid_roles_vars.is_empty() {
1179 return None;
1180 }
1181
1182 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1183
1184 for (name, _value) in invalid_roles_vars {
1186 role.vars.map.remove(&name);
1187 }
1188 Some(role)
1189 })
1190 .map(|role| (role.id, role))
1191 .collect();
1192
1193 txn.update_roles_without_auth(roles_to_migrate)?;
1194
1195 Ok(())
1196}
1197
1198fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1201 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1202 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1203 replica.config.location
1204 {
1205 tx.remove_cluster_replica(replica.replica_id)?;
1206 }
1207 }
1208 Ok(())
1209}
1210
1211pub(crate) fn builtin_cluster_replica_config(
1212 replica_size: String,
1213) -> mz_catalog::durable::ReplicaConfig {
1214 mz_catalog::durable::ReplicaConfig {
1215 location: mz_catalog::durable::ReplicaLocation::Managed {
1216 availability_zone: None,
1217 billed_as: None,
1218 pending: false,
1219 internal: false,
1220 size: replica_size,
1221 },
1222 logging: default_logging_config(),
1223 }
1224}
1225
1226fn default_logging_config() -> ReplicaLogging {
1227 ReplicaLogging {
1228 log_logging: false,
1229 interval: Some(Duration::from_secs(1)),
1230 }
1231}
1232
1233#[derive(Debug)]
1234pub struct BuiltinBootstrapClusterConfigMap {
1235 pub system_cluster: BootstrapBuiltinClusterConfig,
1237 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1239 pub probe_cluster: BootstrapBuiltinClusterConfig,
1241 pub support_cluster: BootstrapBuiltinClusterConfig,
1243 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1245}
1246
1247impl BuiltinBootstrapClusterConfigMap {
1248 fn get_config(
1250 &self,
1251 cluster_name: &str,
1252 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1253 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1254 &self.system_cluster
1255 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1256 &self.catalog_server_cluster
1257 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1258 &self.probe_cluster
1259 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1260 &self.support_cluster
1261 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1262 &self.analytics_cluster
1263 } else {
1264 return Err(mz_catalog::durable::CatalogError::Catalog(
1265 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1266 ));
1267 };
1268 Ok(cluster_config.clone())
1269 }
1270}
1271
1272pub(crate) fn into_consolidatable_updates_startup(
1289 updates: Vec<StateUpdate>,
1290 ts: Timestamp,
1291) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1292 updates
1293 .into_iter()
1294 .map(|StateUpdate { kind, ts: _, diff }| {
1295 let kind: BootstrapStateUpdateKind = kind
1296 .try_into()
1297 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1298 (kind, ts, Diff::from(diff))
1299 })
1300 .collect()
1301}
1302
1303fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1304 defaults: &BTreeMap<String, String>,
1305 remote: Option<&BTreeMap<String, String>>,
1306 cfg: &mz_dyncfg::Config<T>,
1307) -> T::ConfigType {
1308 let mut val = T::into_config_type(cfg.default().clone());
1309 let get_fn = |map: &BTreeMap<String, String>| {
1310 let val = map.get(cfg.name())?;
1311 match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1312 Ok(x) => Some(x),
1313 Err(err) => {
1314 tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1315 None
1316 }
1317 }
1318 };
1319 if let Some(x) = get_fn(defaults) {
1320 val = x;
1321 }
1322 if let Some(x) = remote.and_then(get_fn) {
1323 val = x;
1324 }
1325 val
1326}