1mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::ENABLE_EXPRESSION_CACHE;
23use mz_audit_log::{
24 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, ObjectType, VersionedEvent,
25};
26use mz_auth::hash::scram256_hash;
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::{
29 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
30 Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
31};
32use mz_catalog::config::StateConfig;
33use mz_catalog::durable::objects::{
34 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
35};
36use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
37use mz_catalog::expr_cache::{
38 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
39};
40use mz_catalog::memory::error::{Error, ErrorKind};
41use mz_catalog::memory::objects::{
42 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
43};
44use mz_controller::clusters::ReplicaLogging;
45use mz_controller_types::ClusterId;
46use mz_ore::cast::usize_to_u64;
47use mz_ore::collections::HashSet;
48use mz_ore::now::{SYSTEM_TIME, to_datetime};
49use mz_ore::{instrument, soft_assert_no_log};
50use mz_repr::adt::mz_acl_item::PrivilegeMap;
51use mz_repr::namespaces::is_unstable_schema;
52use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
53use mz_sql::catalog::{CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars};
54use mz_sql::func::OP_IMPLS;
55use mz_sql::names::CommentObjectId;
56use mz_sql::rbac;
57use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
58use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
59use mz_storage_client::controller::{StorageMetadata, StorageTxn};
60use mz_storage_client::storage_collections::StorageCollections;
61use tracing::{Instrument, info, warn};
62use uuid::Uuid;
63
64use crate::AdapterError;
66use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
67use crate::catalog::state::LocalExpressionCache;
68use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState, Config, is_reserved_name};
69
70pub struct InitializeStateResult {
71 pub state: CatalogState,
73 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
75 pub new_builtin_collections: BTreeSet<GlobalId>,
77 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
79 pub last_seen_version: String,
81 pub expr_cache_handle: Option<ExpressionCacheHandle>,
83 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
85 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
87}
88
89pub struct OpenCatalogResult {
90 pub catalog: Catalog,
92 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
94 pub new_builtin_collections: BTreeSet<GlobalId>,
96 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
98 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
100 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
102}
103
104impl Catalog {
105 pub async fn initialize_state<'a>(
109 config: StateConfig,
110 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
111 ) -> Result<InitializeStateResult, AdapterError> {
112 for builtin_role in BUILTIN_ROLES {
113 assert!(
114 is_reserved_name(builtin_role.name),
115 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
116 BUILTIN_PREFIXES.join(", ")
117 );
118 }
119 for builtin_cluster in BUILTIN_CLUSTERS {
120 assert!(
121 is_reserved_name(builtin_cluster.name),
122 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
123 BUILTIN_PREFIXES.join(", ")
124 );
125 }
126
127 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
128 if config.all_features {
129 system_configuration.enable_all_feature_flags_by_default();
130 }
131
132 let mut state = CatalogState {
133 database_by_name: imbl::OrdMap::new(),
134 database_by_id: imbl::OrdMap::new(),
135 entry_by_id: imbl::OrdMap::new(),
136 entry_by_global_id: imbl::OrdMap::new(),
137 notices_by_dep_id: imbl::OrdMap::new(),
138 ambient_schemas_by_name: imbl::OrdMap::new(),
139 ambient_schemas_by_id: imbl::OrdMap::new(),
140 clusters_by_name: imbl::OrdMap::new(),
141 clusters_by_id: imbl::OrdMap::new(),
142 roles_by_name: imbl::OrdMap::new(),
143 roles_by_id: imbl::OrdMap::new(),
144 network_policies_by_id: imbl::OrdMap::new(),
145 role_auth_by_id: imbl::OrdMap::new(),
146 network_policies_by_name: imbl::OrdMap::new(),
147 system_configuration: Arc::new(system_configuration),
148 scoped_system_parameters: Default::default(),
149 default_privileges: Arc::new(DefaultPrivileges::default()),
150 system_privileges: Arc::new(PrivilegeMap::default()),
151 comments: Arc::new(CommentsMap::default()),
152 source_references: imbl::OrdMap::new(),
153 storage_metadata: Arc::new(StorageMetadata::default()),
154 temporary_schemas: imbl::OrdMap::new(),
155 mock_authentication_nonce: Default::default(),
156 config: mz_sql::catalog::CatalogConfig {
157 start_time: to_datetime((config.now)()),
158 start_instant: Instant::now(),
159 nonce: rand::random(),
160 environment_id: config.environment_id,
161 session_id: Uuid::new_v4(),
162 build_info: config.build_info,
163 now: config.now.clone(),
164 connection_context: config.connection_context,
165 helm_chart_version: config.helm_chart_version,
166 },
167 cluster_replica_sizes: config.cluster_replica_sizes,
168 availability_zones: config.availability_zones,
169 egress_addresses: config.egress_addresses,
170 aws_principal_context: config.aws_principal_context,
171 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
172 http_host_name: config.http_host_name,
173 license_key: config.license_key,
174 };
175
176 let deploy_generation = storage.get_deployment_generation().await?;
177
178 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
179 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
180 let mut txn = storage.transaction().await?;
181
182 let new_builtin_collections = {
184 migrate::durable_migrate(
185 &mut txn,
186 state.config.environment_id.organization_id(),
187 config.boot_ts,
188 )?;
189 if let Some(remote_system_parameters) = config.remote_system_parameters {
192 for (name, value) in remote_system_parameters {
193 txn.upsert_system_config(&name, value)?;
194 }
195 txn.set_system_config_synced_once()?;
196 }
197 let new_builtin_collections = add_new_remove_old_builtin_items_migration(&mut txn)?;
199 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
200 system_cluster: config.builtin_system_cluster_config,
201 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
202 probe_cluster: config.builtin_probe_cluster_config,
203 support_cluster: config.builtin_support_cluster_config,
204 analytics_cluster: config.builtin_analytics_cluster_config,
205 };
206 add_new_remove_old_builtin_clusters_migration(
207 &mut txn,
208 &builtin_bootstrap_cluster_config_map,
209 config.boot_ts,
210 )?;
211 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
212 add_new_remove_old_builtin_cluster_replicas_migration(
213 &mut txn,
214 &builtin_bootstrap_cluster_config_map,
215 config.boot_ts,
216 )?;
217 add_new_remove_old_builtin_roles_migration(&mut txn)?;
218 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
219 remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
220
221 new_builtin_collections
222 };
223
224 let op_updates = txn.get_and_commit_op_updates();
225 updates.extend(op_updates);
226
227 let mut builtin_table_updates = Vec::new();
228
229 {
231 for (name, value) in config.system_parameter_defaults {
234 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
235 Ok(_) => (),
236 Err(Error {
237 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
238 }) => {
239 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
240 }
241 Err(e) => return Err(e.into()),
242 };
243 }
244 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
245 }
246
247 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
250 differential_dataflow::consolidation::consolidate_updates(&mut updates);
251 soft_assert_no_log!(
252 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
253 "consolidated updates should be positive during startup: {updates:?}"
254 );
255
256 let mut pre_item_updates = Vec::new();
257 let mut system_item_updates = Vec::new();
258 let mut item_updates = Vec::new();
259 let mut post_item_updates = Vec::new();
260 let mut audit_log_updates = Vec::new();
261 for (kind, ts, diff) in updates {
262 match kind {
263 BootstrapStateUpdateKind::Role(_)
264 | BootstrapStateUpdateKind::RoleAuth(_)
265 | BootstrapStateUpdateKind::Database(_)
266 | BootstrapStateUpdateKind::Schema(_)
267 | BootstrapStateUpdateKind::DefaultPrivilege(_)
268 | BootstrapStateUpdateKind::SystemPrivilege(_)
269 | BootstrapStateUpdateKind::SystemConfiguration(_)
270 | BootstrapStateUpdateKind::ClusterSystemConfiguration(_)
271 | BootstrapStateUpdateKind::ReplicaSystemConfiguration(_)
272 | BootstrapStateUpdateKind::Cluster(_)
273 | BootstrapStateUpdateKind::NetworkPolicy(_)
274 | BootstrapStateUpdateKind::ClusterReplica(_) => {
275 pre_item_updates.push(StateUpdate {
276 kind: kind.into(),
277 ts,
278 diff: diff.try_into().expect("valid diff"),
279 })
280 }
281 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
282 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
283 system_item_updates.push(StateUpdate {
284 kind: kind.into(),
285 ts,
286 diff: diff.try_into().expect("valid diff"),
287 })
288 }
289 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
290 kind: kind.into(),
291 ts,
292 diff: diff.try_into().expect("valid diff"),
293 }),
294 BootstrapStateUpdateKind::Comment(_)
295 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
296 | BootstrapStateUpdateKind::SourceReferences(_)
297 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
298 post_item_updates.push((kind, ts, diff));
299 }
300 BootstrapStateUpdateKind::AuditLog(_) => {
301 audit_log_updates.push(StateUpdate {
302 kind: kind.into(),
303 ts,
304 diff: diff.try_into().expect("valid diff"),
305 });
306 }
307 }
308 }
309
310 let (builtin_table_update, _catalog_updates) = state
311 .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
312 .await;
313 builtin_table_updates.extend(builtin_table_update);
314
315 {
319 if let Some(password) = config.external_login_password_mz_system {
320 let role_auth = RoleAuth {
321 role_id: MZ_SYSTEM_ROLE_ID,
322 password_hash: Some(
325 scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
326 .map_err(|_| {
327 AdapterError::Internal("Failed to hash mz_system password.".to_owned())
328 })?,
329 ),
330 updated_at: SYSTEM_TIME(),
331 };
332 state
333 .role_auth_by_id
334 .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
335 let builtin_table_update = state.generate_builtin_table_update(
336 mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
337 mz_catalog::memory::objects::StateDiff::Addition,
338 );
339 builtin_table_updates.extend(builtin_table_update);
340 }
341 }
342
343 let expr_cache_start = Instant::now();
344 info!("startup: coordinator init: catalog open: expr cache open beginning");
345 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
348 let expr_cache_enabled = config
349 .enable_expression_cache_override
350 .unwrap_or(enable_expr_cache_dyncfg);
351 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
352 info!(
353 ?config.enable_expression_cache_override,
354 ?enable_expr_cache_dyncfg,
355 "using expression cache for startup"
356 );
357 let current_ids = txn
358 .get_items()
359 .flat_map(|item| {
360 let gid = item.global_id.clone();
361 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
362 std::iter::once(gid).chain(gids)
363 })
364 .chain(
365 txn.get_system_object_mappings()
366 .map(|som| som.unique_identifier.global_id),
367 )
368 .collect();
369 let dyncfgs = config.persist_client.dyncfgs().clone();
370 let build_version = if config.build_info.is_dev() {
371 config
374 .build_info
375 .semver_version_build()
376 .expect("build ID is not available on your platform!")
377 } else {
378 config.build_info.semver_version()
379 };
380 let expr_cache_config = ExpressionCacheConfig {
381 build_version,
382 shard_id: txn
383 .get_expression_cache_shard()
384 .expect("expression cache shard should exist for opened catalogs"),
385 persist: config.persist_client,
386 current_ids,
387 remove_prior_versions: !config.read_only,
388 compact_shard: config.read_only,
389 dyncfgs,
390 };
391 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
392 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
393 (
394 Some(expr_cache_handle),
395 cached_local_exprs,
396 cached_global_exprs,
397 )
398 } else {
399 (None, BTreeMap::new(), BTreeMap::new())
400 };
401 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
402 info!(
403 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
404 expr_cache_start.elapsed()
405 );
406
407 let (builtin_table_update, _catalog_updates) = state
413 .apply_updates(system_item_updates, &mut local_expr_cache)
414 .await;
415 builtin_table_updates.extend(builtin_table_update);
416
417 let last_seen_version =
418 get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
419
420 let mz_authentication_mock_nonce =
421 txn.get_authentication_mock_nonce().ok_or_else(|| {
422 Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
423 })?;
424
425 state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
426
427 let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
429 let migrate_result = migrate::migrate(
430 &mut state,
431 &mut txn,
432 &mut local_expr_cache,
433 item_updates,
434 config.now,
435 config.boot_ts,
436 )
437 .await
438 .map_err(|e| {
439 Error::new(ErrorKind::FailedCatalogMigration {
440 last_seen_version: last_seen_version.clone(),
441 this_version: config.build_info.version,
442 cause: e.to_string(),
443 })
444 })?;
445 if !migrate_result.post_item_updates.is_empty() {
446 post_item_updates.extend(migrate_result.post_item_updates);
449 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
451 for (_, ts, _) in &mut post_item_updates {
452 *ts = max_ts;
453 }
454 }
455 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
456 }
457
458 (
459 migrate_result.builtin_table_updates,
460 migrate_result.catalog_updates,
461 )
462 } else {
463 state
464 .apply_updates(item_updates, &mut local_expr_cache)
465 .await
466 };
467 builtin_table_updates.extend(builtin_table_update);
468
469 let post_item_updates = post_item_updates
470 .into_iter()
471 .map(|(kind, ts, diff)| StateUpdate {
472 kind: kind.into(),
473 ts,
474 diff: diff.try_into().expect("valid diff"),
475 })
476 .collect();
477 let (builtin_table_update, _catalog_updates) = state
478 .apply_updates(post_item_updates, &mut local_expr_cache)
479 .await;
480 builtin_table_updates.extend(builtin_table_update);
481
482 for audit_log_update in audit_log_updates {
486 builtin_table_updates.extend(
487 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
488 );
489 }
490
491 let schema_migration_result = builtin_schema_migration::run(
493 config.build_info,
494 deploy_generation,
495 &mut txn,
496 config.builtin_item_migration_config,
497 )
498 .await?;
499
500 let state_updates = txn.get_and_commit_op_updates();
501
502 let (table_updates, _catalog_updates) = state
508 .apply_updates(state_updates, &mut local_expr_cache)
509 .await;
510 builtin_table_updates.extend(table_updates);
511 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
512
513 set_migration_version(&mut txn, config.build_info.semver_version())?;
515
516 txn.commit(config.boot_ts).await?;
517
518 schema_migration_result.cleanup_action.await;
520
521 Ok(InitializeStateResult {
522 state,
523 migrated_storage_collections_0dt: schema_migration_result.replaced_items,
524 new_builtin_collections: new_builtin_collections.into_iter().collect(),
525 builtin_table_updates,
526 last_seen_version,
527 expr_cache_handle,
528 cached_global_exprs,
529 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
530 })
531 }
532
533 #[instrument(name = "catalog::open")]
544 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
545 async move {
546 let mut storage = config.storage;
547
548 let InitializeStateResult {
549 state,
550 migrated_storage_collections_0dt,
551 new_builtin_collections,
552 mut builtin_table_updates,
553 last_seen_version: _,
554 expr_cache_handle,
555 cached_global_exprs,
556 uncached_local_exprs,
557 } =
558 Self::initialize_state(config.state, &mut storage)
562 .instrument(tracing::info_span!("catalog::initialize_state"))
563 .boxed()
564 .await?;
565
566 let catalog = Catalog {
567 state,
568 expr_cache_handle,
569 transient_revision: 1,
570 storage: Arc::new(tokio::sync::Mutex::new(storage)),
571 };
572
573 for (op, func) in OP_IMPLS.iter() {
576 match func {
577 mz_sql::func::Func::Scalar(impls) => {
578 for imp in impls {
579 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
580 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
581 ));
582 }
583 }
584 _ => unreachable!("all operators must be scalar functions"),
585 }
586 }
587
588 for ip in &catalog.state.egress_addresses {
589 builtin_table_updates.push(
590 catalog
591 .state
592 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
593 );
594 }
595
596 if !catalog.state.license_key.id.is_empty() {
597 builtin_table_updates.push(
598 catalog.state.resolve_builtin_table_update(
599 catalog
600 .state
601 .pack_license_key_update(&catalog.state.license_key)?,
602 ),
603 );
604 }
605
606 catalog.storage().await.mark_bootstrap_complete().await;
607
608 Ok(OpenCatalogResult {
609 catalog,
610 migrated_storage_collections_0dt,
611 new_builtin_collections,
612 builtin_table_updates,
613 cached_global_exprs,
614 uncached_local_exprs,
615 })
616 }
617 .instrument(tracing::info_span!("catalog::open"))
618 .boxed()
619 }
620
621 async fn initialize_storage_state(
628 &mut self,
629 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
630 ) -> Result<(), mz_catalog::durable::CatalogError> {
631 let collections = self
632 .entries()
633 .filter(|entry| entry.item().is_storage_collection())
634 .flat_map(|entry| entry.global_ids())
635 .collect();
636
637 let mut state = self.state.clone();
640
641 let mut storage = self.storage().await;
642 let shard_id = storage.shard_id();
643 let mut txn = storage.transaction().await?;
644
645 let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
648 let global_id = self.get_entry(&item_id).latest_global_id();
649 match txn.get_collection_metadata().get(&global_id) {
650 None => {
651 txn.insert_collection_metadata([(global_id, shard_id)].into())
652 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
653 }
654 Some(id) => assert_eq!(*id, shard_id),
655 }
656
657 storage_collections
658 .initialize_state(&mut txn, collections)
659 .await
660 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
661
662 let updates = txn.get_and_commit_op_updates();
663 let (builtin_updates, catalog_updates) = state
664 .apply_updates(updates, &mut LocalExpressionCache::Closed)
665 .await;
666 assert!(
667 builtin_updates.is_empty(),
668 "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
669 );
670 assert!(
671 catalog_updates.is_empty(),
672 "storage is not allowed to generate catalog changes that would change the catalog or controller state"
673 );
674 let commit_ts = txn.upper();
675 txn.commit(commit_ts).await?;
676 drop(storage);
677
678 self.state = state;
680 Ok(())
681 }
682
683 pub async fn initialize_controller(
686 &mut self,
687 config: mz_controller::ControllerConfig,
688 envd_epoch: core::num::NonZeroI64,
689 read_only: bool,
690 ) -> Result<mz_controller::Controller, mz_catalog::durable::CatalogError> {
691 let controller_start = Instant::now();
692 info!("startup: controller init: beginning");
693
694 let controller = {
695 let mut storage = self.storage().await;
696 let mut tx = storage.transaction().await?;
697 mz_controller::prepare_initialization(&mut tx)
698 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
699 let updates = tx.get_and_commit_op_updates();
700 assert!(
701 updates.is_empty(),
702 "initializing controller should not produce updates: {updates:?}"
703 );
704 let commit_ts = tx.upper();
705 tx.commit(commit_ts).await?;
706
707 let read_only_tx = storage.transaction().await?;
708
709 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
710 };
711
712 self.initialize_storage_state(&controller.storage_collections)
713 .await?;
714
715 info!(
716 "startup: controller init: complete in {:?}",
717 controller_start.elapsed()
718 );
719
720 Ok(controller)
721 }
722
723 pub async fn expire(self) {
725 if let Some(storage) = Arc::into_inner(self.storage) {
728 let storage = storage.into_inner();
729 storage.expire().await;
730 }
731 }
732}
733
734impl CatalogState {
735 fn set_system_configuration_default(
737 &mut self,
738 name: &str,
739 value: VarInput,
740 ) -> Result<(), Error> {
741 Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
742 }
743}
744
745fn add_new_remove_old_builtin_items_migration(
749 txn: &mut mz_catalog::durable::Transaction<'_>,
750) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
751 let mut new_builtin_mappings = Vec::new();
752 let mut builtin_descs = HashSet::new();
754
755 let mut builtins = Vec::new();
758 for builtin in BUILTINS::iter() {
759 let desc = SystemObjectDescription {
760 schema_name: builtin.schema().to_string(),
761 object_type: builtin.catalog_item_type(),
762 object_name: builtin.name().to_string(),
763 };
764 if !builtin_descs.insert(desc.clone()) {
766 panic!(
767 "duplicate builtin description: {:?}, {:?}",
768 SystemObjectDescription {
769 schema_name: builtin.schema().to_string(),
770 object_type: builtin.catalog_item_type(),
771 object_name: builtin.name().to_string(),
772 },
773 builtin
774 );
775 }
776 builtins.push((desc, builtin));
777 }
778
779 let mut system_object_mappings: BTreeMap<_, _> = txn
780 .get_system_object_mappings()
781 .map(|system_object_mapping| {
782 (
783 system_object_mapping.description.clone(),
784 system_object_mapping,
785 )
786 })
787 .collect();
788
789 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
790 builtins.into_iter().partition_map(|(desc, builtin)| {
791 let fingerprint = match builtin.runtime_alterable() {
792 false => builtin.fingerprint(),
793 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
794 };
795 match system_object_mappings.remove(&desc) {
796 Some(system_object_mapping) => {
797 Either::Left((builtin, system_object_mapping, fingerprint))
798 }
799 None => Either::Right((builtin, fingerprint)),
800 }
801 });
802 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
803 let new_builtins: Vec<_> = new_builtins
804 .into_iter()
805 .zip_eq(new_builtin_ids.clone())
806 .collect();
807
808 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
810 new_builtin_mappings.push(SystemObjectMapping {
811 description: SystemObjectDescription {
812 schema_name: builtin.schema().to_string(),
813 object_type: builtin.catalog_item_type(),
814 object_name: builtin.name().to_string(),
815 },
816 unique_identifier: SystemObjectUniqueIdentifier {
817 catalog_id,
818 global_id,
819 fingerprint,
820 },
821 });
822
823 let handled_runtime_alterable = match builtin {
829 Builtin::Connection(c) if c.runtime_alterable => {
830 let mut acl_items = vec![rbac::owner_privilege(
831 mz_sql::catalog::ObjectType::Connection,
832 c.owner_id.clone(),
833 )];
834 acl_items.extend_from_slice(c.access);
835 let versions = BTreeMap::new();
837
838 txn.insert_item(
839 catalog_id,
840 c.oid,
841 global_id,
842 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
843 c.name,
844 c.sql.into(),
845 *c.owner_id,
846 acl_items,
847 versions,
848 )?;
849 true
850 }
851 _ => false,
852 };
853 assert_eq!(
854 builtin.runtime_alterable(),
855 handled_runtime_alterable,
856 "runtime alterable object was not handled by migration",
857 );
858 }
859 txn.set_system_object_mappings(new_builtin_mappings)?;
860
861 let builtins_with_catalog_ids = existing_builtins
863 .iter()
864 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
865 .chain(
866 new_builtins
867 .into_iter()
868 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
869 );
870
871 for (builtin, id) in builtins_with_catalog_ids {
872 let (comment_id, desc, comments) = match builtin {
873 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
874 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
875 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
876 Builtin::MaterializedView(mv) => (
877 CommentObjectId::MaterializedView(id),
878 &mv.desc,
879 &mv.column_comments,
880 ),
881 Builtin::Log(_)
882 | Builtin::Type(_)
883 | Builtin::Func(_)
884 | Builtin::Index(_)
885 | Builtin::Connection(_) => continue,
886 };
887 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
888
889 let mut comments = comments.clone();
890 for (col_idx, name) in desc.iter_names().enumerate() {
891 if let Some(comment) = comments.remove(name.as_str()) {
892 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
894 }
895 }
896 assert!(
897 comments.is_empty(),
898 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
899 );
900 }
901
902 let mut deleted_system_objects = BTreeSet::new();
905 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
906 let mut deleted_comments = BTreeSet::new();
907 for (desc, mapping) in system_object_mappings {
908 deleted_system_objects.insert(mapping.description);
909 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
910 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
911 }
912
913 let id = mapping.unique_identifier.catalog_id;
914 let comment_id = match desc.object_type {
915 CatalogItemType::Table => CommentObjectId::Table(id),
916 CatalogItemType::Source => CommentObjectId::Source(id),
917 CatalogItemType::View => CommentObjectId::View(id),
918 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(id),
919 CatalogItemType::Sink
920 | CatalogItemType::Index
921 | CatalogItemType::Type
922 | CatalogItemType::Func
923 | CatalogItemType::Secret
924 | CatalogItemType::Connection => continue,
925 };
926 deleted_comments.insert(comment_id);
927 }
928 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
934 assert!(
938 deleted_system_objects
939 .iter()
940 .filter(|object| object.object_type != CatalogItemType::Index)
942 .all(
943 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
944 || delete_exceptions.contains(deleted_object)
945 ),
946 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
947 deleted_system_objects
948 );
949 txn.drop_comments(&deleted_comments)?;
950 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
951 txn.remove_system_object_mappings(deleted_system_objects)?;
952
953 let new_builtin_collections = new_builtin_ids
955 .into_iter()
956 .map(|(_catalog_id, global_id)| global_id)
957 .collect();
958
959 Ok(new_builtin_collections)
960}
961
962fn add_new_remove_old_builtin_clusters_migration(
963 txn: &mut mz_catalog::durable::Transaction<'_>,
964 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
965 boot_ts: Timestamp,
966) -> Result<(), mz_catalog::durable::CatalogError> {
967 let mut durable_clusters: BTreeMap<_, _> = txn
968 .get_clusters()
969 .filter(|cluster| cluster.id.is_system())
970 .map(|cluster| (cluster.name.to_string(), cluster))
971 .collect();
972
973 for builtin_cluster in BUILTIN_CLUSTERS {
975 if durable_clusters.remove(builtin_cluster.name).is_none() {
976 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
977
978 let cluster_id = txn.insert_system_cluster(
979 builtin_cluster.name,
980 vec![],
981 builtin_cluster.privileges.to_vec(),
982 builtin_cluster.owner_id.to_owned(),
983 mz_catalog::durable::ClusterConfig {
984 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
985 size: cluster_config.size,
986 availability_zones: vec![],
987 replication_factor: cluster_config.replication_factor,
988 logging: default_logging_config(),
989 optimizer_feature_overrides: Default::default(),
990 schedule: Default::default(),
991 auto_scaling_strategy: None,
992 reconfiguration: None,
993 burst: None,
994 }),
995 workload_class: None,
996 },
997 &HashSet::new(),
998 )?;
999
1000 let audit_id = txn.allocate_audit_log_id()?;
1001 txn.insert_audit_log_event(VersionedEvent::new(
1002 audit_id,
1003 EventType::Create,
1004 ObjectType::Cluster,
1005 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1006 id: cluster_id.to_string(),
1007 name: builtin_cluster.name.to_string(),
1008 }),
1009 None,
1010 boot_ts.into(),
1011 ));
1012 }
1013 }
1014
1015 let old_clusters = durable_clusters
1017 .values()
1018 .map(|cluster| cluster.id)
1019 .collect();
1020 txn.remove_clusters(&old_clusters)?;
1021
1022 for (_name, cluster) in &durable_clusters {
1023 let audit_id = txn.allocate_audit_log_id()?;
1024 txn.insert_audit_log_event(VersionedEvent::new(
1025 audit_id,
1026 EventType::Drop,
1027 ObjectType::Cluster,
1028 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1029 id: cluster.id.to_string(),
1030 name: cluster.name.clone(),
1031 }),
1032 None,
1033 boot_ts.into(),
1034 ));
1035 }
1036
1037 Ok(())
1038}
1039
1040fn add_new_remove_old_builtin_introspection_source_migration(
1041 txn: &mut mz_catalog::durable::Transaction<'_>,
1042) -> Result<(), AdapterError> {
1043 let mut new_indexes = Vec::new();
1044 let mut removed_indexes = BTreeSet::new();
1045 for cluster in txn.get_clusters() {
1046 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1047
1048 let mut new_logs = Vec::new();
1049
1050 for log in BUILTINS::logs() {
1051 if introspection_source_index_ids.remove(log.name).is_none() {
1052 new_logs.push(log);
1053 }
1054 }
1055
1056 for log in new_logs {
1057 let (item_id, gid) =
1058 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1059 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1060 }
1061
1062 removed_indexes.extend(
1065 introspection_source_index_ids
1066 .into_keys()
1067 .map(|name| (cluster.id, name.to_string())),
1068 );
1069 }
1070 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1071 txn.remove_introspection_source_indexes(removed_indexes)?;
1072 Ok(())
1073}
1074
1075fn add_new_remove_old_builtin_roles_migration(
1076 txn: &mut mz_catalog::durable::Transaction<'_>,
1077) -> Result<(), mz_catalog::durable::CatalogError> {
1078 let mut durable_roles: BTreeMap<_, _> = txn
1079 .get_roles()
1080 .filter(|role| role.id.is_system() || role.id.is_predefined())
1081 .map(|role| (role.name.to_string(), role))
1082 .collect();
1083
1084 for builtin_role in BUILTIN_ROLES {
1086 if durable_roles.remove(builtin_role.name).is_none() {
1087 txn.insert_builtin_role(
1088 builtin_role.id,
1089 builtin_role.name.to_string(),
1090 builtin_role.attributes.clone(),
1091 RoleMembership::new(),
1092 RoleVars::default(),
1093 builtin_role.oid,
1094 )?;
1095 }
1096 }
1097
1098 let old_roles = durable_roles.values().map(|role| role.id).collect();
1100 txn.remove_roles(&old_roles)?;
1101
1102 Ok(())
1103}
1104
1105fn add_new_remove_old_builtin_cluster_replicas_migration(
1106 txn: &mut Transaction<'_>,
1107 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1108 boot_ts: Timestamp,
1109) -> Result<(), AdapterError> {
1110 let cluster_lookup: BTreeMap<_, _> = txn
1111 .get_clusters()
1112 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1113 .collect();
1114
1115 let cluster_id_to_name: BTreeMap<ClusterId, String> = cluster_lookup
1116 .values()
1117 .map(|cluster| (cluster.id, cluster.name.clone()))
1118 .collect();
1119
1120 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1121 .get_cluster_replicas()
1122 .filter(|replica| replica.replica_id.is_system())
1123 .fold(BTreeMap::new(), |mut acc, replica| {
1124 acc.entry(replica.cluster_id)
1125 .or_insert_with(BTreeMap::new)
1126 .insert(replica.name.to_string(), replica);
1127 acc
1128 });
1129
1130 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1132 let cluster = cluster_lookup
1133 .get(builtin_replica.cluster_name)
1134 .expect("builtin cluster replica references non-existent cluster");
1135 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1137 let replica_names = durable_replicas
1138 .get_mut(&cluster.id)
1139 .unwrap_or(&mut empty_map);
1140
1141 let builtin_cluster_bootstrap_config =
1142 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1143 if replica_names.remove(builtin_replica.name).is_none()
1144 && builtin_cluster_bootstrap_config.replication_factor > 0
1148 {
1149 let replica_size = match cluster.config.variant {
1150 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1151 ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size.clone(),
1152 };
1153
1154 let config = builtin_cluster_replica_config(replica_size.clone());
1155 let replica_id = txn.insert_cluster_replica(
1156 cluster.id,
1157 builtin_replica.name,
1158 config,
1159 MZ_SYSTEM_ROLE_ID,
1160 )?;
1161
1162 let audit_id = txn.allocate_audit_log_id()?;
1163 txn.insert_audit_log_event(VersionedEvent::new(
1164 audit_id,
1165 EventType::Create,
1166 ObjectType::ClusterReplica,
1167 EventDetails::CreateClusterReplicaV4(mz_audit_log::CreateClusterReplicaV4 {
1168 cluster_id: cluster.id.to_string(),
1169 cluster_name: cluster.name.clone(),
1170 replica_id: Some(replica_id.to_string()),
1171 replica_name: builtin_replica.name.to_string(),
1172 logical_size: replica_size,
1173 billed_as: None,
1174 internal: false,
1175 reason: CreateOrDropClusterReplicaReasonV1::System,
1176 scheduling_policies: None,
1177 }),
1178 None,
1179 boot_ts.into(),
1180 ));
1181 }
1182 }
1183
1184 let old_replicas: Vec<_> = durable_replicas
1186 .values()
1187 .flat_map(|replicas| replicas.values())
1188 .collect();
1189 let old_replica_ids = old_replicas.iter().map(|r| r.replica_id).collect();
1190 txn.remove_cluster_replicas(&old_replica_ids)?;
1191
1192 for replica in &old_replicas {
1193 let cluster_name = cluster_id_to_name
1194 .get(&replica.cluster_id)
1195 .cloned()
1196 .unwrap_or_else(|| "<unknown>".to_string());
1197
1198 let audit_id = txn.allocate_audit_log_id()?;
1199 txn.insert_audit_log_event(VersionedEvent::new(
1200 audit_id,
1201 EventType::Drop,
1202 ObjectType::ClusterReplica,
1203 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1204 cluster_id: replica.cluster_id.to_string(),
1205 cluster_name,
1206 replica_id: Some(replica.replica_id.to_string()),
1207 replica_name: replica.name.clone(),
1208 reason: CreateOrDropClusterReplicaReasonV1::System,
1209 scheduling_policies: None,
1210 }),
1211 None,
1212 boot_ts.into(),
1213 ));
1214 }
1215
1216 Ok(())
1217}
1218
1219fn remove_invalid_config_param_role_defaults_migration(
1226 txn: &mut Transaction<'_>,
1227) -> Result<(), AdapterError> {
1228 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1229
1230 let roles_to_migrate: BTreeMap<_, _> = txn
1231 .get_roles()
1232 .filter_map(|mut role| {
1233 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1238
1239 let mut invalid_roles_vars = BTreeMap::new();
1241 for (name, value) in &role.vars.map {
1242 let Ok(session_var) = session_vars.inspect(name) else {
1244 invalid_roles_vars.insert(name.clone(), value.clone());
1245 continue;
1246 };
1247 if session_var.check(value.borrow()).is_err() {
1248 invalid_roles_vars.insert(name.clone(), value.clone());
1249 }
1250 }
1251
1252 if invalid_roles_vars.is_empty() {
1254 return None;
1255 }
1256
1257 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1258
1259 for (name, _value) in invalid_roles_vars {
1261 role.vars.map.remove(&name);
1262 }
1263 Some(role)
1264 })
1265 .map(|role| (role.id, role))
1266 .collect();
1267
1268 txn.update_roles_without_auth(roles_to_migrate)?;
1269
1270 Ok(())
1271}
1272
1273fn remove_pending_cluster_replicas_migration(
1276 tx: &mut Transaction,
1277 boot_ts: mz_repr::Timestamp,
1278) -> Result<(), anyhow::Error> {
1279 let cluster_names: BTreeMap<_, _> = tx.get_clusters().map(|c| (c.id, c.name)).collect();
1281
1282 let occurred_at = boot_ts.into();
1283
1284 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1285 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1286 replica.config.location
1287 {
1288 let cluster_name = cluster_names
1289 .get(&replica.cluster_id)
1290 .cloned()
1291 .unwrap_or_else(|| "<unknown>".to_string());
1292
1293 info!(
1294 "removing pending cluster replica '{}' from cluster '{}'",
1295 replica.name, cluster_name,
1296 );
1297
1298 tx.remove_cluster_replica(replica.replica_id)?;
1299
1300 let audit_id = tx.allocate_audit_log_id()?;
1304 tx.insert_audit_log_event(VersionedEvent::new(
1305 audit_id,
1306 EventType::Drop,
1307 ObjectType::ClusterReplica,
1308 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1309 cluster_id: replica.cluster_id.to_string(),
1310 cluster_name,
1311 replica_id: Some(replica.replica_id.to_string()),
1312 replica_name: replica.name,
1313 reason: CreateOrDropClusterReplicaReasonV1::System,
1314 scheduling_policies: None,
1315 }),
1316 None,
1317 occurred_at,
1318 ));
1319 }
1320 }
1321 Ok(())
1322}
1323
1324pub(crate) fn builtin_cluster_replica_config(
1325 replica_size: String,
1326) -> mz_catalog::durable::ReplicaConfig {
1327 mz_catalog::durable::ReplicaConfig {
1328 location: mz_catalog::durable::ReplicaLocation::Managed {
1329 availability_zones: Vec::new(),
1330 billed_as: None,
1331 pending: false,
1332 internal: false,
1333 size: replica_size,
1334 },
1335 logging: default_logging_config(),
1336 }
1337}
1338
1339fn default_logging_config() -> ReplicaLogging {
1340 ReplicaLogging {
1341 log_logging: false,
1342 interval: Some(Duration::from_secs(1)),
1343 }
1344}
1345
1346#[derive(Debug)]
1347pub struct BuiltinBootstrapClusterConfigMap {
1348 pub system_cluster: BootstrapBuiltinClusterConfig,
1350 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1352 pub probe_cluster: BootstrapBuiltinClusterConfig,
1354 pub support_cluster: BootstrapBuiltinClusterConfig,
1356 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1358}
1359
1360impl BuiltinBootstrapClusterConfigMap {
1361 fn get_config(
1363 &self,
1364 cluster_name: &str,
1365 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1366 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1367 &self.system_cluster
1368 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1369 &self.catalog_server_cluster
1370 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1371 &self.probe_cluster
1372 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1373 &self.support_cluster
1374 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1375 &self.analytics_cluster
1376 } else {
1377 return Err(mz_catalog::durable::CatalogError::Catalog(
1378 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1379 ));
1380 };
1381 Ok(cluster_config.clone())
1382 }
1383}
1384
1385pub(crate) fn into_consolidatable_updates_startup(
1402 updates: Vec<StateUpdate>,
1403 ts: Timestamp,
1404) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1405 updates
1406 .into_iter()
1407 .map(|StateUpdate { kind, ts: _, diff }| {
1408 let kind: BootstrapStateUpdateKind = kind
1409 .try_into()
1410 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1411 (kind, ts, Diff::from(diff))
1412 })
1413 .collect()
1414}