1mod builtin_item_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use futures::future::{BoxFuture, FutureExt};
19use itertools::{Either, Itertools};
20use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
21use mz_adapter_types::dyncfgs::{
22 ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE, FORCE_SWAP_FOR_CC_SIZES,
23};
24use mz_auth::hash::scram256_hash;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
28 Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
29};
30use mz_catalog::config::StateConfig;
31use mz_catalog::durable::objects::{
32 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
33};
34use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
35use mz_catalog::expr_cache::{
36 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
37};
38use mz_catalog::memory::error::{Error, ErrorKind};
39use mz_catalog::memory::objects::{
40 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
41};
42use mz_controller::clusters::{ReplicaLocation, ReplicaLogging};
43use mz_controller_types::ClusterId;
44use mz_ore::cast::usize_to_u64;
45use mz_ore::collections::HashSet;
46use mz_ore::now::{SYSTEM_TIME, to_datetime};
47use mz_ore::{instrument, soft_assert_no_log};
48use mz_repr::adt::mz_acl_item::PrivilegeMap;
49use mz_repr::namespaces::is_unstable_schema;
50use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
51use mz_sql::catalog::{
52 BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
53};
54use mz_sql::func::OP_IMPLS;
55use mz_sql::names::CommentObjectId;
56use mz_sql::rbac;
57use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
58use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
59use mz_storage_client::storage_collections::StorageCollections;
60use timely::Container;
61use tracing::{Instrument, info, warn};
62use uuid::Uuid;
63
64use crate::AdapterError;
66use crate::catalog::open::builtin_item_migration::{
67 BuiltinItemMigrationResult, migrate_builtin_items,
68};
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71 BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name, migrate,
72};
73
74pub struct InitializeStateResult {
75 pub state: CatalogState,
77 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
79 pub new_builtin_collections: BTreeSet<GlobalId>,
81 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
83 pub last_seen_version: String,
85 pub expr_cache_handle: Option<ExpressionCacheHandle>,
87 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
89 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
91}
92
93pub struct OpenCatalogResult {
94 pub catalog: Catalog,
96 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
98 pub new_builtin_collections: BTreeSet<GlobalId>,
100 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
102 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
104 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
106}
107
108impl Catalog {
109 pub async fn initialize_state<'a>(
113 config: StateConfig,
114 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
115 ) -> Result<InitializeStateResult, AdapterError> {
116 for builtin_role in BUILTIN_ROLES {
117 assert!(
118 is_reserved_name(builtin_role.name),
119 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
120 BUILTIN_PREFIXES.join(", ")
121 );
122 }
123 for builtin_cluster in BUILTIN_CLUSTERS {
124 assert!(
125 is_reserved_name(builtin_cluster.name),
126 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
127 BUILTIN_PREFIXES.join(", ")
128 );
129 }
130
131 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
132 if config.all_features {
133 system_configuration.enable_all_feature_flags_by_default();
134 }
135
136 let mut state = CatalogState {
137 database_by_name: BTreeMap::new(),
138 database_by_id: BTreeMap::new(),
139 entry_by_id: BTreeMap::new(),
140 entry_by_global_id: BTreeMap::new(),
141 ambient_schemas_by_name: BTreeMap::new(),
142 ambient_schemas_by_id: BTreeMap::new(),
143 clusters_by_name: BTreeMap::new(),
144 clusters_by_id: BTreeMap::new(),
145 roles_by_name: BTreeMap::new(),
146 roles_by_id: BTreeMap::new(),
147 network_policies_by_id: BTreeMap::new(),
148 role_auth_by_id: BTreeMap::new(),
149 network_policies_by_name: BTreeMap::new(),
150 system_configuration,
151 default_privileges: DefaultPrivileges::default(),
152 system_privileges: PrivilegeMap::default(),
153 comments: CommentsMap::default(),
154 source_references: BTreeMap::new(),
155 storage_metadata: Default::default(),
156 temporary_schemas: BTreeMap::new(),
157 config: mz_sql::catalog::CatalogConfig {
158 start_time: to_datetime((config.now)()),
159 start_instant: Instant::now(),
160 nonce: rand::random(),
161 environment_id: config.environment_id,
162 session_id: Uuid::new_v4(),
163 build_info: config.build_info,
164 timestamp_interval: Duration::from_secs(1),
165 now: config.now.clone(),
166 connection_context: config.connection_context,
167 builtins_cfg: BuiltinsConfig {
168 include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
172 &config.system_parameter_defaults,
173 config.remote_system_parameters.as_ref(),
174 &ENABLE_CONTINUAL_TASK_BUILTINS,
175 ),
176 },
177 helm_chart_version: config.helm_chart_version,
178 },
179 cluster_replica_sizes: config.cluster_replica_sizes,
180 availability_zones: config.availability_zones,
181 egress_addresses: config.egress_addresses,
182 aws_principal_context: config.aws_principal_context,
183 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
184 http_host_name: config.http_host_name,
185 license_key: config.license_key,
186 };
187
188 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
189 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
190 let mut txn = storage.transaction().await?;
191
192 let (migrated_builtins, new_builtin_collections) = {
194 migrate::durable_migrate(
195 &mut txn,
196 state.config.environment_id.organization_id(),
197 config.boot_ts,
198 )?;
199 if let Some(remote_system_parameters) = config.remote_system_parameters {
202 for (name, value) in remote_system_parameters {
203 txn.upsert_system_config(&name, value)?;
204 }
205 txn.set_system_config_synced_once()?;
206 }
207 let (migrated_builtins, new_builtin_collections) =
209 add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
210 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
211 system_cluster: config.builtin_system_cluster_config,
212 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
213 probe_cluster: config.builtin_probe_cluster_config,
214 support_cluster: config.builtin_support_cluster_config,
215 analytics_cluster: config.builtin_analytics_cluster_config,
216 };
217 add_new_remove_old_builtin_clusters_migration(
218 &mut txn,
219 &builtin_bootstrap_cluster_config_map,
220 )?;
221 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
222 add_new_remove_old_builtin_cluster_replicas_migration(
223 &mut txn,
224 &builtin_bootstrap_cluster_config_map,
225 )?;
226 add_new_remove_old_builtin_roles_migration(&mut txn)?;
227 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
228 (migrated_builtins, new_builtin_collections)
229 };
230 remove_pending_cluster_replicas_migration(&mut txn)?;
231
232 let op_updates = txn.get_and_commit_op_updates();
233 updates.extend(op_updates);
234
235 {
237 for (name, value) in config.system_parameter_defaults {
240 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
241 Ok(_) => (),
242 Err(Error {
243 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
244 }) => {
245 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
246 }
247 Err(e) => return Err(e.into()),
248 };
249 }
250 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
251 if let Some(password) = config.external_login_password_mz_system {
252 state.role_auth_by_id.insert(
253 MZ_SYSTEM_ROLE_ID,
254 RoleAuth {
255 role_id: MZ_SYSTEM_ROLE_ID,
256 password_hash: Some(scram256_hash(&password).map_err(|_| {
257 AdapterError::Internal("Failed to hash mz_system password.".to_owned())
258 })?),
259 updated_at: SYSTEM_TIME(),
260 },
261 );
262 }
263 }
264
265 let mut builtin_table_updates = Vec::new();
266
267 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
270 differential_dataflow::consolidation::consolidate_updates(&mut updates);
271 soft_assert_no_log!(
272 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
273 "consolidated updates should be positive during startup: {updates:?}"
274 );
275
276 let mut pre_item_updates = Vec::new();
277 let mut system_item_updates = Vec::new();
278 let mut item_updates = Vec::new();
279 let mut post_item_updates = Vec::new();
280 let mut audit_log_updates = Vec::new();
281 for (kind, ts, diff) in updates {
282 match kind {
283 BootstrapStateUpdateKind::Role(_)
284 | BootstrapStateUpdateKind::RoleAuth(_)
285 | BootstrapStateUpdateKind::Database(_)
286 | BootstrapStateUpdateKind::Schema(_)
287 | BootstrapStateUpdateKind::DefaultPrivilege(_)
288 | BootstrapStateUpdateKind::SystemPrivilege(_)
289 | BootstrapStateUpdateKind::SystemConfiguration(_)
290 | BootstrapStateUpdateKind::Cluster(_)
291 | BootstrapStateUpdateKind::NetworkPolicy(_)
292 | BootstrapStateUpdateKind::ClusterReplica(_) => {
293 pre_item_updates.push(StateUpdate {
294 kind: kind.into(),
295 ts,
296 diff: diff.try_into().expect("valid diff"),
297 })
298 }
299 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
300 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
301 system_item_updates.push(StateUpdate {
302 kind: kind.into(),
303 ts,
304 diff: diff.try_into().expect("valid diff"),
305 })
306 }
307 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
308 kind: kind.into(),
309 ts,
310 diff: diff.try_into().expect("valid diff"),
311 }),
312 BootstrapStateUpdateKind::Comment(_)
313 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
314 | BootstrapStateUpdateKind::SourceReferences(_)
315 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
316 post_item_updates.push((kind, ts, diff));
317 }
318 BootstrapStateUpdateKind::AuditLog(_) => {
319 audit_log_updates.push(StateUpdate {
320 kind: kind.into(),
321 ts,
322 diff: diff.try_into().expect("valid diff"),
323 });
324 }
325 }
326 }
327
328 let builtin_table_update = state
329 .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
330 .await;
331 builtin_table_updates.extend(builtin_table_update);
332
333 let expr_cache_start = Instant::now();
334 info!("startup: coordinator init: catalog open: expr cache open beginning");
335 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
338 let expr_cache_enabled = config.enable_0dt_deployment
339 && config
340 .enable_expression_cache_override
341 .unwrap_or(enable_expr_cache_dyncfg);
342 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
343 info!(
344 ?config.enable_0dt_deployment,
345 ?config.enable_expression_cache_override,
346 ?enable_expr_cache_dyncfg,
347 "using expression cache for startup"
348 );
349 let current_ids = txn
350 .get_items()
351 .flat_map(|item| {
352 let gid = item.global_id.clone();
353 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
354 std::iter::once(gid).chain(gids.into_iter())
355 })
356 .chain(
357 txn.get_system_object_mappings()
358 .map(|som| som.unique_identifier.global_id),
359 )
360 .collect();
361 let dyncfgs = config.persist_client.dyncfgs().clone();
362 let build_version = if config.build_info.is_dev() {
363 config
366 .build_info
367 .semver_version_build()
368 .expect("build ID is not available on your platform!")
369 } else {
370 config.build_info.semver_version()
371 };
372 let expr_cache_config = ExpressionCacheConfig {
373 build_version,
374 shard_id: txn
375 .get_expression_cache_shard()
376 .expect("expression cache shard should exist for opened catalogs"),
377 persist: config.persist_client,
378 current_ids,
379 remove_prior_versions: !config.read_only,
380 compact_shard: config.read_only,
381 dyncfgs,
382 };
383 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
384 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
385 (
386 Some(expr_cache_handle),
387 cached_local_exprs,
388 cached_global_exprs,
389 )
390 } else {
391 (None, BTreeMap::new(), BTreeMap::new())
392 };
393 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
394 info!(
395 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
396 expr_cache_start.elapsed()
397 );
398
399 let builtin_table_update = state
400 .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
401 .await;
402 builtin_table_updates.extend(builtin_table_update);
403
404 let last_seen_version = txn
405 .get_catalog_content_version()
406 .unwrap_or("new")
407 .to_string();
408
409 let builtin_table_update = if !config.skip_migrations {
411 let migrate_result = migrate::migrate(
412 &mut state,
413 &mut txn,
414 &mut local_expr_cache,
415 item_updates,
416 config.now,
417 config.boot_ts,
418 )
419 .await
420 .map_err(|e| {
421 Error::new(ErrorKind::FailedMigration {
422 last_seen_version: last_seen_version.clone(),
423 this_version: config.build_info.version,
424 cause: e.to_string(),
425 })
426 })?;
427 if !migrate_result.post_item_updates.is_empty() {
428 post_item_updates.extend(migrate_result.post_item_updates);
431 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
433 for (_, ts, _) in &mut post_item_updates {
434 *ts = max_ts;
435 }
436 }
437 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
438 }
439
440 migrate_result.builtin_table_updates
441 } else {
442 state
443 .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
444 .await
445 };
446 builtin_table_updates.extend(builtin_table_update);
447
448 let post_item_updates = post_item_updates
449 .into_iter()
450 .map(|(kind, ts, diff)| StateUpdate {
451 kind: kind.into(),
452 ts,
453 diff: diff.try_into().expect("valid diff"),
454 })
455 .collect();
456 let builtin_table_update = state
457 .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
458 .await;
459 builtin_table_updates.extend(builtin_table_update);
460
461 for audit_log_update in audit_log_updates {
465 builtin_table_updates.extend(
466 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
467 );
468 }
469
470 let BuiltinItemMigrationResult {
472 builtin_table_updates: builtin_table_update,
473 migrated_storage_collections_0dt,
474 cleanup_action,
475 } = migrate_builtin_items(
476 &mut state,
477 &mut txn,
478 &mut local_expr_cache,
479 migrated_builtins,
480 config.builtin_item_migration_config,
481 )
482 .await?;
483 builtin_table_updates.extend(builtin_table_update);
484 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
485
486 txn.commit(config.boot_ts).await?;
487
488 cleanup_action.await;
489
490 if FORCE_SWAP_FOR_CC_SIZES.get(state.system_configuration.dyncfgs()) {
492 info!("force-enabling swap for cc replica sizes");
493
494 for size in state.cluster_replica_sizes.0.values_mut() {
495 if size.is_cc {
496 size.swap_enabled = true;
497 size.cpu_exclusive = false;
498 size.selectors.remove("materialize.cloud/scratch-fs");
499 size.selectors
500 .insert("materialize.cloud/swap".into(), "true".into());
501 }
502 }
503
504 for cluster in state.clusters_by_id.values_mut() {
509 for replica in cluster.replicas_by_id_.values_mut() {
510 if let ReplicaLocation::Managed(loc) = &mut replica.config.location {
511 let alloc = &mut loc.allocation;
512 if alloc.is_cc {
513 alloc.swap_enabled = true;
514 alloc.cpu_exclusive = false;
515 alloc.selectors.remove("materialize.cloud/scratch-fs");
516 alloc
517 .selectors
518 .insert("materialize.cloud/swap".into(), "true".into());
519 }
520 }
521 }
522 }
523 }
524
525 Ok(InitializeStateResult {
526 state,
527 migrated_storage_collections_0dt,
528 new_builtin_collections: new_builtin_collections.into_iter().collect(),
529 builtin_table_updates,
530 last_seen_version,
531 expr_cache_handle,
532 cached_global_exprs,
533 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
534 })
535 }
536
537 #[instrument(name = "catalog::open")]
548 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
549 async move {
550 let mut storage = config.storage;
551
552 let InitializeStateResult {
553 state,
554 migrated_storage_collections_0dt,
555 new_builtin_collections,
556 mut builtin_table_updates,
557 last_seen_version: _,
558 expr_cache_handle,
559 cached_global_exprs,
560 uncached_local_exprs,
561 } =
562 Self::initialize_state(config.state, &mut storage)
566 .instrument(tracing::info_span!("catalog::initialize_state"))
567 .boxed()
568 .await?;
569
570 let catalog = Catalog {
571 state,
572 plans: CatalogPlans::default(),
573 expr_cache_handle,
574 transient_revision: 1,
575 storage: Arc::new(tokio::sync::Mutex::new(storage)),
576 };
577
578 for (op, func) in OP_IMPLS.iter() {
581 match func {
582 mz_sql::func::Func::Scalar(impls) => {
583 for imp in impls {
584 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
585 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
586 ));
587 }
588 }
589 _ => unreachable!("all operators must be scalar functions"),
590 }
591 }
592
593 for ip in &catalog.state.egress_addresses {
594 builtin_table_updates.push(
595 catalog
596 .state
597 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
598 );
599 }
600
601 if !catalog.state.license_key.id.is_empty() {
602 builtin_table_updates.push(
603 catalog.state.resolve_builtin_table_update(
604 catalog
605 .state
606 .pack_license_key_update(&catalog.state.license_key)?,
607 ),
608 );
609 }
610
611 catalog.storage().await.mark_bootstrap_complete();
612
613 Ok(OpenCatalogResult {
614 catalog,
615 migrated_storage_collections_0dt,
616 new_builtin_collections,
617 builtin_table_updates,
618 cached_global_exprs,
619 uncached_local_exprs,
620 })
621 }
622 .instrument(tracing::info_span!("catalog::open"))
623 .boxed()
624 }
625
626 async fn initialize_storage_state(
633 &mut self,
634 storage_collections: &Arc<
635 dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
636 >,
637 ) -> Result<(), mz_catalog::durable::CatalogError> {
638 let collections = self
639 .entries()
640 .filter(|entry| entry.item().is_storage_collection())
641 .flat_map(|entry| entry.global_ids())
642 .collect();
643
644 let mut state = self.state.clone();
647
648 let mut storage = self.storage().await;
649 let mut txn = storage.transaction().await?;
650
651 storage_collections
652 .initialize_state(&mut txn, collections)
653 .await
654 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
655
656 let updates = txn.get_and_commit_op_updates();
657 let builtin_updates = state.apply_updates(updates)?;
658 assert!(builtin_updates.is_empty());
659 let commit_ts = txn.upper();
660 txn.commit(commit_ts).await?;
661 drop(storage);
662
663 self.state = state;
665 Ok(())
666 }
667
668 pub async fn initialize_controller(
671 &mut self,
672 config: mz_controller::ControllerConfig,
673 envd_epoch: core::num::NonZeroI64,
674 read_only: bool,
675 ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
676 {
677 let controller_start = Instant::now();
678 info!("startup: controller init: beginning");
679
680 let controller = {
681 let mut storage = self.storage().await;
682 let mut tx = storage.transaction().await?;
683 mz_controller::prepare_initialization(&mut tx)
684 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
685 let updates = tx.get_and_commit_op_updates();
686 assert!(
687 updates.is_empty(),
688 "initializing controller should not produce updates: {updates:?}"
689 );
690 let commit_ts = tx.upper();
691 tx.commit(commit_ts).await?;
692
693 let read_only_tx = storage.transaction().await?;
694
695 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
696 };
697
698 self.initialize_storage_state(&controller.storage_collections)
699 .await?;
700
701 info!(
702 "startup: controller init: complete in {:?}",
703 controller_start.elapsed()
704 );
705
706 Ok(controller)
707 }
708
709 pub async fn expire(self) {
711 if let Some(storage) = Arc::into_inner(self.storage) {
714 let storage = storage.into_inner();
715 storage.expire().await;
716 }
717 }
718}
719
720impl CatalogState {
721 fn set_system_configuration_default(
723 &mut self,
724 name: &str,
725 value: VarInput,
726 ) -> Result<(), Error> {
727 Ok(self.system_configuration.set_default(name, value)?)
728 }
729}
730
731fn add_new_remove_old_builtin_items_migration(
736 builtins_cfg: &BuiltinsConfig,
737 txn: &mut mz_catalog::durable::Transaction<'_>,
738) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
739 let mut new_builtin_mappings = Vec::new();
740 let mut migrated_builtin_ids = Vec::new();
741 let mut builtin_descs = HashSet::new();
743
744 let mut builtins = Vec::new();
747 for builtin in BUILTINS::iter(builtins_cfg) {
748 let desc = SystemObjectDescription {
749 schema_name: builtin.schema().to_string(),
750 object_type: builtin.catalog_item_type(),
751 object_name: builtin.name().to_string(),
752 };
753 if !builtin_descs.insert(desc.clone()) {
755 panic!(
756 "duplicate builtin description: {:?}, {:?}",
757 SystemObjectDescription {
758 schema_name: builtin.schema().to_string(),
759 object_type: builtin.catalog_item_type(),
760 object_name: builtin.name().to_string(),
761 },
762 builtin
763 );
764 }
765 builtins.push((desc, builtin));
766 }
767
768 let mut system_object_mappings: BTreeMap<_, _> = txn
769 .get_system_object_mappings()
770 .map(|system_object_mapping| {
771 (
772 system_object_mapping.description.clone(),
773 system_object_mapping,
774 )
775 })
776 .collect();
777
778 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
779 builtins.into_iter().partition_map(|(desc, builtin)| {
780 let fingerprint = match builtin.runtime_alterable() {
781 false => builtin.fingerprint(),
782 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
783 };
784 match system_object_mappings.remove(&desc) {
785 Some(system_object_mapping) => {
786 Either::Left((builtin, system_object_mapping, fingerprint))
787 }
788 None => Either::Right((builtin, fingerprint)),
789 }
790 });
791 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
792 let new_builtins: Vec<_> = new_builtins
793 .into_iter()
794 .zip(new_builtin_ids.clone())
795 .collect();
796
797 for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
799 if system_object_mapping.unique_identifier.fingerprint != fingerprint {
800 assert_ne!(
806 *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
807 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
808 );
809 assert_ne!(
810 builtin.catalog_item_type(),
811 CatalogItemType::Type,
812 "types cannot be migrated"
813 );
814 assert_ne!(
815 system_object_mapping.unique_identifier.fingerprint,
816 RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
817 "clearing the runtime alterable flag on an existing object is not permitted",
818 );
819 assert!(
820 !builtin.runtime_alterable(),
821 "setting the runtime alterable flag on an existing object is not permitted"
822 );
823 migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
824 }
825 }
826
827 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
829 new_builtin_mappings.push(SystemObjectMapping {
830 description: SystemObjectDescription {
831 schema_name: builtin.schema().to_string(),
832 object_type: builtin.catalog_item_type(),
833 object_name: builtin.name().to_string(),
834 },
835 unique_identifier: SystemObjectUniqueIdentifier {
836 catalog_id,
837 global_id,
838 fingerprint,
839 },
840 });
841
842 let handled_runtime_alterable = match builtin {
848 Builtin::Connection(c) if c.runtime_alterable => {
849 let mut acl_items = vec![rbac::owner_privilege(
850 mz_sql::catalog::ObjectType::Connection,
851 c.owner_id.clone(),
852 )];
853 acl_items.extend_from_slice(c.access);
854 let versions = BTreeMap::new();
856
857 txn.insert_item(
858 catalog_id,
859 c.oid,
860 global_id,
861 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
862 c.name,
863 c.sql.into(),
864 *c.owner_id,
865 acl_items,
866 versions,
867 )?;
868 true
869 }
870 _ => false,
871 };
872 assert_eq!(
873 builtin.runtime_alterable(),
874 handled_runtime_alterable,
875 "runtime alterable object was not handled by migration",
876 );
877 }
878 txn.set_system_object_mappings(new_builtin_mappings)?;
879
880 let builtins_with_catalog_ids = existing_builtins
882 .iter()
883 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
884 .chain(
885 new_builtins
886 .into_iter()
887 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
888 );
889
890 for (builtin, id) in builtins_with_catalog_ids {
891 let (comment_id, desc, comments) = match builtin {
892 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
893 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
894 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
895 Builtin::Log(_)
896 | Builtin::Type(_)
897 | Builtin::Func(_)
898 | Builtin::ContinualTask(_)
899 | Builtin::Index(_)
900 | Builtin::Connection(_) => continue,
901 };
902 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
903
904 let mut comments = comments.clone();
905 for (col_idx, name) in desc.iter_names().enumerate() {
906 if let Some(comment) = comments.remove(name.as_str()) {
907 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
909 }
910 }
911 assert!(
912 comments.is_empty(),
913 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
914 );
915 }
916
917 let mut deleted_system_objects = BTreeSet::new();
920 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
921 let mut deleted_comments = BTreeSet::new();
922 for (desc, mapping) in system_object_mappings {
923 deleted_system_objects.insert(mapping.description);
924 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
925 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
926 }
927
928 let id = mapping.unique_identifier.catalog_id;
929 let comment_id = match desc.object_type {
930 CatalogItemType::Table => CommentObjectId::Table(id),
931 CatalogItemType::Source => CommentObjectId::Source(id),
932 CatalogItemType::View => CommentObjectId::View(id),
933 CatalogItemType::Sink
934 | CatalogItemType::MaterializedView
935 | CatalogItemType::Index
936 | CatalogItemType::Type
937 | CatalogItemType::Func
938 | CatalogItemType::Secret
939 | CatalogItemType::Connection
940 | CatalogItemType::ContinualTask => continue,
941 };
942 deleted_comments.insert(comment_id);
943 }
944 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
950 assert!(
954 deleted_system_objects
955 .iter()
956 .filter(|object| object.object_type != CatalogItemType::Index)
958 .all(
959 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
960 || delete_exceptions.contains(deleted_object)
961 ),
962 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
963 deleted_system_objects
964 );
965 txn.drop_comments(&deleted_comments)?;
966 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
967 txn.remove_system_object_mappings(deleted_system_objects)?;
968
969 let new_builtin_collections = new_builtin_ids
971 .into_iter()
972 .map(|(_catalog_id, global_id)| global_id)
973 .collect();
974
975 Ok((migrated_builtin_ids, new_builtin_collections))
976}
977
978fn add_new_remove_old_builtin_clusters_migration(
979 txn: &mut mz_catalog::durable::Transaction<'_>,
980 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
981) -> Result<(), mz_catalog::durable::CatalogError> {
982 let mut durable_clusters: BTreeMap<_, _> = txn
983 .get_clusters()
984 .filter(|cluster| cluster.id.is_system())
985 .map(|cluster| (cluster.name.to_string(), cluster))
986 .collect();
987
988 for builtin_cluster in BUILTIN_CLUSTERS {
990 if durable_clusters.remove(builtin_cluster.name).is_none() {
991 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
992
993 txn.insert_system_cluster(
994 builtin_cluster.name,
995 vec![],
996 builtin_cluster.privileges.to_vec(),
997 builtin_cluster.owner_id.to_owned(),
998 mz_catalog::durable::ClusterConfig {
999 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
1000 size: cluster_config.size,
1001 availability_zones: vec![],
1002 replication_factor: cluster_config.replication_factor,
1003 logging: default_logging_config(),
1004 optimizer_feature_overrides: Default::default(),
1005 schedule: Default::default(),
1006 }),
1007 workload_class: None,
1008 },
1009 &HashSet::new(),
1010 )?;
1011 }
1012 }
1013
1014 let old_clusters = durable_clusters
1016 .values()
1017 .map(|cluster| cluster.id)
1018 .collect();
1019 txn.remove_clusters(&old_clusters)?;
1020
1021 Ok(())
1022}
1023
1024fn add_new_remove_old_builtin_introspection_source_migration(
1025 txn: &mut mz_catalog::durable::Transaction<'_>,
1026) -> Result<(), AdapterError> {
1027 let mut new_indexes = Vec::new();
1028 let mut removed_indexes = BTreeSet::new();
1029 for cluster in txn.get_clusters() {
1030 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1031
1032 let mut new_logs = Vec::new();
1033
1034 for log in BUILTINS::logs() {
1035 if introspection_source_index_ids.remove(log.name).is_none() {
1036 new_logs.push(log);
1037 }
1038 }
1039
1040 for log in new_logs {
1041 let (item_id, gid) =
1042 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1043 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1044 }
1045
1046 removed_indexes.extend(
1049 introspection_source_index_ids
1050 .into_keys()
1051 .map(|name| (cluster.id, name.to_string())),
1052 );
1053 }
1054 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1055 txn.remove_introspection_source_indexes(removed_indexes)?;
1056 Ok(())
1057}
1058
1059fn add_new_remove_old_builtin_roles_migration(
1060 txn: &mut mz_catalog::durable::Transaction<'_>,
1061) -> Result<(), mz_catalog::durable::CatalogError> {
1062 let mut durable_roles: BTreeMap<_, _> = txn
1063 .get_roles()
1064 .filter(|role| role.id.is_system() || role.id.is_predefined())
1065 .map(|role| (role.name.to_string(), role))
1066 .collect();
1067
1068 for builtin_role in BUILTIN_ROLES {
1070 if durable_roles.remove(builtin_role.name).is_none() {
1071 txn.insert_builtin_role(
1072 builtin_role.id,
1073 builtin_role.name.to_string(),
1074 builtin_role.attributes.clone(),
1075 RoleMembership::new(),
1076 RoleVars::default(),
1077 builtin_role.oid,
1078 )?;
1079 }
1080 }
1081
1082 let old_roles = durable_roles.values().map(|role| role.id).collect();
1084 txn.remove_roles(&old_roles)?;
1085
1086 Ok(())
1087}
1088
1089fn add_new_remove_old_builtin_cluster_replicas_migration(
1090 txn: &mut Transaction<'_>,
1091 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1092) -> Result<(), AdapterError> {
1093 let cluster_lookup: BTreeMap<_, _> = txn
1094 .get_clusters()
1095 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1096 .collect();
1097
1098 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1099 .get_cluster_replicas()
1100 .filter(|replica| replica.replica_id.is_system())
1101 .fold(BTreeMap::new(), |mut acc, replica| {
1102 acc.entry(replica.cluster_id)
1103 .or_insert_with(BTreeMap::new)
1104 .insert(replica.name.to_string(), replica);
1105 acc
1106 });
1107
1108 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1110 let cluster = cluster_lookup
1111 .get(builtin_replica.cluster_name)
1112 .expect("builtin cluster replica references non-existent cluster");
1113 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1115 let replica_names = durable_replicas
1116 .get_mut(&cluster.id)
1117 .unwrap_or(&mut empty_map);
1118
1119 let builtin_cluster_boostrap_config =
1120 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1121 if replica_names.remove(builtin_replica.name).is_none()
1122 && builtin_cluster_boostrap_config.replication_factor > 0
1126 {
1127 let replica_size = match cluster.config.variant {
1128 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1129 ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
1130 };
1131
1132 let config = builtin_cluster_replica_config(replica_size);
1133 txn.insert_cluster_replica(
1134 cluster.id,
1135 builtin_replica.name,
1136 config,
1137 MZ_SYSTEM_ROLE_ID,
1138 )?;
1139 }
1140 }
1141
1142 let old_replicas = durable_replicas
1144 .values()
1145 .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1146 .collect();
1147 txn.remove_cluster_replicas(&old_replicas)?;
1148
1149 Ok(())
1150}
1151
1152fn remove_invalid_config_param_role_defaults_migration(
1159 txn: &mut Transaction<'_>,
1160) -> Result<(), AdapterError> {
1161 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1162
1163 let roles_to_migrate: BTreeMap<_, _> = txn
1164 .get_roles()
1165 .filter_map(|mut role| {
1166 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1171
1172 let mut invalid_roles_vars = BTreeMap::new();
1174 for (name, value) in &role.vars.map {
1175 let Ok(session_var) = session_vars.inspect(name) else {
1177 invalid_roles_vars.insert(name.clone(), value.clone());
1178 continue;
1179 };
1180 if session_var.check(value.borrow()).is_err() {
1181 invalid_roles_vars.insert(name.clone(), value.clone());
1182 }
1183 }
1184
1185 if invalid_roles_vars.is_empty() {
1187 return None;
1188 }
1189
1190 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1191
1192 for (name, _value) in invalid_roles_vars {
1194 role.vars.map.remove(&name);
1195 }
1196 Some(role)
1197 })
1198 .map(|role| (role.id, role))
1199 .collect();
1200
1201 txn.update_roles_without_auth(roles_to_migrate)?;
1202
1203 Ok(())
1204}
1205
1206fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1209 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1210 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1211 replica.config.location
1212 {
1213 tx.remove_cluster_replica(replica.replica_id)?;
1214 }
1215 }
1216 Ok(())
1217}
1218
1219pub(crate) fn builtin_cluster_replica_config(
1220 replica_size: String,
1221) -> mz_catalog::durable::ReplicaConfig {
1222 mz_catalog::durable::ReplicaConfig {
1223 location: mz_catalog::durable::ReplicaLocation::Managed {
1224 availability_zone: None,
1225 billed_as: None,
1226 pending: false,
1227 internal: false,
1228 size: replica_size,
1229 },
1230 logging: default_logging_config(),
1231 }
1232}
1233
1234fn default_logging_config() -> ReplicaLogging {
1235 ReplicaLogging {
1236 log_logging: false,
1237 interval: Some(Duration::from_secs(1)),
1238 }
1239}
1240
1241#[derive(Debug)]
1242pub struct BuiltinBootstrapClusterConfigMap {
1243 pub system_cluster: BootstrapBuiltinClusterConfig,
1245 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1247 pub probe_cluster: BootstrapBuiltinClusterConfig,
1249 pub support_cluster: BootstrapBuiltinClusterConfig,
1251 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1253}
1254
1255impl BuiltinBootstrapClusterConfigMap {
1256 fn get_config(
1258 &self,
1259 cluster_name: &str,
1260 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1261 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1262 &self.system_cluster
1263 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1264 &self.catalog_server_cluster
1265 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1266 &self.probe_cluster
1267 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1268 &self.support_cluster
1269 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1270 &self.analytics_cluster
1271 } else {
1272 return Err(mz_catalog::durable::CatalogError::Catalog(
1273 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1274 ));
1275 };
1276 Ok(cluster_config.clone())
1277 }
1278}
1279
1280pub(crate) fn into_consolidatable_updates_startup(
1297 updates: Vec<StateUpdate>,
1298 ts: Timestamp,
1299) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1300 updates
1301 .into_iter()
1302 .map(|StateUpdate { kind, ts: _, diff }| {
1303 let kind: BootstrapStateUpdateKind = kind
1304 .try_into()
1305 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1306 (kind, ts, Diff::from(diff))
1307 })
1308 .collect()
1309}
1310
1311fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1312 defaults: &BTreeMap<String, String>,
1313 remote: Option<&BTreeMap<String, String>>,
1314 cfg: &mz_dyncfg::Config<T>,
1315) -> T::ConfigType {
1316 let mut val = T::into_config_type(cfg.default().clone());
1317 let get_fn = |map: &BTreeMap<String, String>| {
1318 let val = map.get(cfg.name())?;
1319 match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1320 Ok(x) => Some(x),
1321 Err(err) => {
1322 tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1323 None
1324 }
1325 }
1326 };
1327 if let Some(x) = get_fn(defaults) {
1328 val = x;
1329 }
1330 if let Some(x) = remote.and_then(get_fn) {
1331 val = x;
1332 }
1333 val
1334}