1mod builtin_item_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use futures::future::{BoxFuture, FutureExt};
19use itertools::{Either, Itertools};
20use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
21use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
25 Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
26};
27use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
28use mz_catalog::durable::objects::{
29 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
30};
31use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
32use mz_catalog::expr_cache::{
33 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
34};
35use mz_catalog::memory::error::{Error, ErrorKind};
36use mz_catalog::memory::objects::{
37 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, StateUpdate,
38};
39use mz_controller::clusters::{ReplicaAllocation, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_ore::cast::usize_to_u64;
42use mz_ore::collections::HashSet;
43use mz_ore::now::to_datetime;
44use mz_ore::{instrument, soft_assert_no_log};
45use mz_repr::adt::mz_acl_item::PrivilegeMap;
46use mz_repr::namespaces::is_unstable_schema;
47use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
48use mz_sql::catalog::{
49 BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
50};
51use mz_sql::func::OP_IMPLS;
52use mz_sql::names::CommentObjectId;
53use mz_sql::rbac;
54use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
55use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
56use mz_storage_client::storage_collections::StorageCollections;
57use timely::Container;
58use tracing::{Instrument, info, warn};
59use uuid::Uuid;
60
61use crate::AdapterError;
63use crate::catalog::open::builtin_item_migration::{
64 BuiltinItemMigrationResult, migrate_builtin_items,
65};
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68 BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name, migrate,
69};
70
71pub struct InitializeStateResult {
72 pub state: CatalogState,
74 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
76 pub new_builtin_collections: BTreeSet<GlobalId>,
78 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
80 pub last_seen_version: String,
82 pub expr_cache_handle: Option<ExpressionCacheHandle>,
84 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
86 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
88}
89
90pub struct OpenCatalogResult {
91 pub catalog: Catalog,
93 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
95 pub new_builtin_collections: BTreeSet<GlobalId>,
97 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
99 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
101 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
103}
104
105impl Catalog {
106 pub async fn initialize_state<'a>(
110 config: StateConfig,
111 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
112 ) -> Result<InitializeStateResult, AdapterError> {
113 for builtin_role in BUILTIN_ROLES {
114 assert!(
115 is_reserved_name(builtin_role.name),
116 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
117 BUILTIN_PREFIXES.join(", ")
118 );
119 }
120 for builtin_cluster in BUILTIN_CLUSTERS {
121 assert!(
122 is_reserved_name(builtin_cluster.name),
123 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
124 BUILTIN_PREFIXES.join(", ")
125 );
126 }
127
128 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
129 if config.all_features {
130 system_configuration.enable_all_feature_flags_by_default();
131 }
132
133 let mut state = CatalogState {
134 database_by_name: BTreeMap::new(),
135 database_by_id: BTreeMap::new(),
136 entry_by_id: BTreeMap::new(),
137 entry_by_global_id: BTreeMap::new(),
138 ambient_schemas_by_name: BTreeMap::new(),
139 ambient_schemas_by_id: BTreeMap::new(),
140 clusters_by_name: BTreeMap::new(),
141 clusters_by_id: BTreeMap::new(),
142 roles_by_name: BTreeMap::new(),
143 roles_by_id: BTreeMap::new(),
144 network_policies_by_id: BTreeMap::new(),
145 role_auth_by_id: BTreeMap::new(),
146 network_policies_by_name: BTreeMap::new(),
147 system_configuration,
148 default_privileges: DefaultPrivileges::default(),
149 system_privileges: PrivilegeMap::default(),
150 comments: CommentsMap::default(),
151 source_references: BTreeMap::new(),
152 storage_metadata: Default::default(),
153 temporary_schemas: BTreeMap::new(),
154 config: mz_sql::catalog::CatalogConfig {
155 start_time: to_datetime((config.now)()),
156 start_instant: Instant::now(),
157 nonce: rand::random(),
158 environment_id: config.environment_id,
159 session_id: Uuid::new_v4(),
160 build_info: config.build_info,
161 timestamp_interval: Duration::from_secs(1),
162 now: config.now.clone(),
163 connection_context: config.connection_context,
164 builtins_cfg: BuiltinsConfig {
165 include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
169 &config.system_parameter_defaults,
170 config.remote_system_parameters.as_ref(),
171 &ENABLE_CONTINUAL_TASK_BUILTINS,
172 ),
173 },
174 helm_chart_version: config.helm_chart_version,
175 },
176 cluster_replica_sizes: config.cluster_replica_sizes,
177 availability_zones: config.availability_zones,
178 egress_addresses: config.egress_addresses,
179 aws_principal_context: config.aws_principal_context,
180 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
181 http_host_name: config.http_host_name,
182 };
183
184 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
185 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
186 let mut txn = storage.transaction().await?;
187
188 let (migrated_builtins, new_builtin_collections) = {
190 migrate::durable_migrate(
191 &mut txn,
192 state.config.environment_id.organization_id(),
193 config.boot_ts,
194 )?;
195 if let Some(remote_system_parameters) = config.remote_system_parameters {
198 for (name, value) in remote_system_parameters {
199 txn.upsert_system_config(&name, value)?;
200 }
201 txn.set_system_config_synced_once()?;
202 }
203 let (migrated_builtins, new_builtin_collections) =
205 add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
206 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
207 system_cluster: config.builtin_system_cluster_config,
208 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
209 probe_cluster: config.builtin_probe_cluster_config,
210 support_cluster: config.builtin_support_cluster_config,
211 analytics_cluster: config.builtin_analytics_cluster_config,
212 };
213 add_new_remove_old_builtin_clusters_migration(
214 &mut txn,
215 &builtin_bootstrap_cluster_config_map,
216 &state.cluster_replica_sizes,
217 )?;
218 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
219 add_new_remove_old_builtin_cluster_replicas_migration(
220 &mut txn,
221 &builtin_bootstrap_cluster_config_map,
222 &state.cluster_replica_sizes,
223 )?;
224 add_new_remove_old_builtin_roles_migration(&mut txn)?;
225 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
226 (migrated_builtins, new_builtin_collections)
227 };
228 remove_pending_cluster_replicas_migration(&mut txn)?;
229
230 let op_updates = txn.get_and_commit_op_updates();
231 updates.extend(op_updates);
232
233 {
235 for (name, value) in config.system_parameter_defaults {
238 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
239 Ok(_) => (),
240 Err(Error {
241 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
242 }) => {
243 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
244 }
245 Err(e) => return Err(e.into()),
246 };
247 }
248 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
249 }
250
251 let mut builtin_table_updates = Vec::new();
252
253 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
256 differential_dataflow::consolidation::consolidate_updates(&mut updates);
257 soft_assert_no_log!(
258 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
259 "consolidated updates should be positive during startup: {updates:?}"
260 );
261
262 let mut pre_item_updates = Vec::new();
263 let mut system_item_updates = Vec::new();
264 let mut item_updates = Vec::new();
265 let mut post_item_updates = Vec::new();
266 let mut audit_log_updates = Vec::new();
267 for (kind, ts, diff) in updates {
268 match kind {
269 BootstrapStateUpdateKind::Role(_)
270 | BootstrapStateUpdateKind::RoleAuth(_)
271 | BootstrapStateUpdateKind::Database(_)
272 | BootstrapStateUpdateKind::Schema(_)
273 | BootstrapStateUpdateKind::DefaultPrivilege(_)
274 | BootstrapStateUpdateKind::SystemPrivilege(_)
275 | BootstrapStateUpdateKind::SystemConfiguration(_)
276 | BootstrapStateUpdateKind::Cluster(_)
277 | BootstrapStateUpdateKind::NetworkPolicy(_)
278 | BootstrapStateUpdateKind::ClusterReplica(_) => {
279 pre_item_updates.push(StateUpdate {
280 kind: kind.into(),
281 ts,
282 diff: diff.try_into().expect("valid diff"),
283 })
284 }
285 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
286 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
287 system_item_updates.push(StateUpdate {
288 kind: kind.into(),
289 ts,
290 diff: diff.try_into().expect("valid diff"),
291 })
292 }
293 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
294 kind: kind.into(),
295 ts,
296 diff: diff.try_into().expect("valid diff"),
297 }),
298 BootstrapStateUpdateKind::Comment(_)
299 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
300 | BootstrapStateUpdateKind::SourceReferences(_)
301 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
302 post_item_updates.push((kind, ts, diff));
303 }
304 BootstrapStateUpdateKind::AuditLog(_) => {
305 audit_log_updates.push(StateUpdate {
306 kind: kind.into(),
307 ts,
308 diff: diff.try_into().expect("valid diff"),
309 });
310 }
311 }
312 }
313
314 let builtin_table_update = state
315 .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
316 .await;
317 builtin_table_updates.extend(builtin_table_update);
318
319 let expr_cache_start = Instant::now();
320 info!("startup: coordinator init: catalog open: expr cache open beginning");
321 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
324 let expr_cache_enabled = config.enable_0dt_deployment
325 && config
326 .enable_expression_cache_override
327 .unwrap_or(enable_expr_cache_dyncfg);
328 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
329 info!(
330 ?config.enable_0dt_deployment,
331 ?config.enable_expression_cache_override,
332 ?enable_expr_cache_dyncfg,
333 "using expression cache for startup"
334 );
335 let current_ids = txn
336 .get_items()
337 .flat_map(|item| {
338 let gid = item.global_id.clone();
339 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
340 std::iter::once(gid).chain(gids.into_iter())
341 })
342 .chain(
343 txn.get_system_object_mappings()
344 .map(|som| som.unique_identifier.global_id),
345 )
346 .collect();
347 let dyncfgs = config.persist_client.dyncfgs().clone();
348 let build_version = if config.build_info.is_dev() {
349 config
352 .build_info
353 .semver_version_build()
354 .expect("build ID is not available on your platform!")
355 } else {
356 config.build_info.semver_version()
357 };
358 let expr_cache_config = ExpressionCacheConfig {
359 build_version,
360 shard_id: txn
361 .get_expression_cache_shard()
362 .expect("expression cache shard should exist for opened catalogs"),
363 persist: config.persist_client,
364 current_ids,
365 remove_prior_versions: !config.read_only,
366 compact_shard: config.read_only,
367 dyncfgs,
368 };
369 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
370 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
371 (
372 Some(expr_cache_handle),
373 cached_local_exprs,
374 cached_global_exprs,
375 )
376 } else {
377 (None, BTreeMap::new(), BTreeMap::new())
378 };
379 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
380 info!(
381 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
382 expr_cache_start.elapsed()
383 );
384
385 let builtin_table_update = state
386 .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
387 .await;
388 builtin_table_updates.extend(builtin_table_update);
389
390 let last_seen_version = txn
391 .get_catalog_content_version()
392 .unwrap_or("new")
393 .to_string();
394
395 let builtin_table_update = if !config.skip_migrations {
397 let migrate_result = migrate::migrate(
398 &mut state,
399 &mut txn,
400 &mut local_expr_cache,
401 item_updates,
402 config.now,
403 config.boot_ts,
404 )
405 .await
406 .map_err(|e| {
407 Error::new(ErrorKind::FailedMigration {
408 last_seen_version: last_seen_version.clone(),
409 this_version: config.build_info.version,
410 cause: e.to_string(),
411 })
412 })?;
413 if !migrate_result.post_item_updates.is_empty() {
414 post_item_updates.extend(migrate_result.post_item_updates);
417 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
419 for (_, ts, _) in &mut post_item_updates {
420 *ts = max_ts;
421 }
422 }
423 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
424 }
425
426 migrate_result.builtin_table_updates
427 } else {
428 state
429 .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
430 .await
431 };
432 builtin_table_updates.extend(builtin_table_update);
433
434 let post_item_updates = post_item_updates
435 .into_iter()
436 .map(|(kind, ts, diff)| StateUpdate {
437 kind: kind.into(),
438 ts,
439 diff: diff.try_into().expect("valid diff"),
440 })
441 .collect();
442 let builtin_table_update = state
443 .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
444 .await;
445 builtin_table_updates.extend(builtin_table_update);
446
447 for audit_log_update in audit_log_updates {
451 builtin_table_updates.extend(
452 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
453 );
454 }
455
456 let BuiltinItemMigrationResult {
458 builtin_table_updates: builtin_table_update,
459 migrated_storage_collections_0dt,
460 cleanup_action,
461 } = migrate_builtin_items(
462 &mut state,
463 &mut txn,
464 &mut local_expr_cache,
465 migrated_builtins,
466 config.builtin_item_migration_config,
467 )
468 .await?;
469 builtin_table_updates.extend(builtin_table_update);
470 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
471
472 txn.commit(config.boot_ts).await?;
473
474 cleanup_action.await;
475
476 Ok(InitializeStateResult {
477 state,
478 migrated_storage_collections_0dt,
479 new_builtin_collections: new_builtin_collections.into_iter().collect(),
480 builtin_table_updates,
481 last_seen_version,
482 expr_cache_handle,
483 cached_global_exprs,
484 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
485 })
486 }
487
488 #[instrument(name = "catalog::open")]
499 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
500 async move {
501 let mut storage = config.storage;
502
503 let InitializeStateResult {
504 state,
505 migrated_storage_collections_0dt,
506 new_builtin_collections,
507 mut builtin_table_updates,
508 last_seen_version: _,
509 expr_cache_handle,
510 cached_global_exprs,
511 uncached_local_exprs,
512 } =
513 Self::initialize_state(config.state, &mut storage)
517 .instrument(tracing::info_span!("catalog::initialize_state"))
518 .boxed()
519 .await?;
520
521 let catalog = Catalog {
522 state,
523 plans: CatalogPlans::default(),
524 expr_cache_handle,
525 transient_revision: 1,
526 storage: Arc::new(tokio::sync::Mutex::new(storage)),
527 };
528
529 for (op, func) in OP_IMPLS.iter() {
532 match func {
533 mz_sql::func::Func::Scalar(impls) => {
534 for imp in impls {
535 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
536 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
537 ));
538 }
539 }
540 _ => unreachable!("all operators must be scalar functions"),
541 }
542 }
543
544 for ip in &catalog.state.egress_addresses {
545 builtin_table_updates.push(
546 catalog
547 .state
548 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
549 );
550 }
551
552 catalog.storage().await.mark_bootstrap_complete();
553
554 Ok(OpenCatalogResult {
555 catalog,
556 migrated_storage_collections_0dt,
557 new_builtin_collections,
558 builtin_table_updates,
559 cached_global_exprs,
560 uncached_local_exprs,
561 })
562 }
563 .instrument(tracing::info_span!("catalog::open"))
564 .boxed()
565 }
566
567 async fn initialize_storage_state(
574 &mut self,
575 storage_collections: &Arc<
576 dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
577 >,
578 ) -> Result<(), mz_catalog::durable::CatalogError> {
579 let collections = self
580 .entries()
581 .filter(|entry| entry.item().is_storage_collection())
582 .flat_map(|entry| entry.global_ids())
583 .collect();
584
585 let mut state = self.state.clone();
588
589 let mut storage = self.storage().await;
590 let mut txn = storage.transaction().await?;
591
592 storage_collections
593 .initialize_state(&mut txn, collections)
594 .await
595 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
596
597 let updates = txn.get_and_commit_op_updates();
598 let builtin_updates = state.apply_updates(updates)?;
599 assert!(builtin_updates.is_empty());
600 let commit_ts = txn.upper();
601 txn.commit(commit_ts).await?;
602 drop(storage);
603
604 self.state = state;
606 Ok(())
607 }
608
609 pub async fn initialize_controller(
612 &mut self,
613 config: mz_controller::ControllerConfig,
614 envd_epoch: core::num::NonZeroI64,
615 read_only: bool,
616 ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
617 {
618 let controller_start = Instant::now();
619 info!("startup: controller init: beginning");
620
621 let controller = {
622 let mut storage = self.storage().await;
623 let mut tx = storage.transaction().await?;
624 mz_controller::prepare_initialization(&mut tx)
625 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
626 let updates = tx.get_and_commit_op_updates();
627 assert!(
628 updates.is_empty(),
629 "initializing controller should not produce updates: {updates:?}"
630 );
631 let commit_ts = tx.upper();
632 tx.commit(commit_ts).await?;
633
634 let read_only_tx = storage.transaction().await?;
635
636 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
637 };
638
639 self.initialize_storage_state(&controller.storage_collections)
640 .await?;
641
642 info!(
643 "startup: controller init: complete in {:?}",
644 controller_start.elapsed()
645 );
646
647 Ok(controller)
648 }
649
650 pub async fn expire(self) {
652 if let Some(storage) = Arc::into_inner(self.storage) {
655 let storage = storage.into_inner();
656 storage.expire().await;
657 }
658 }
659}
660
661impl CatalogState {
662 fn set_system_configuration_default(
664 &mut self,
665 name: &str,
666 value: VarInput,
667 ) -> Result<(), Error> {
668 Ok(self.system_configuration.set_default(name, value)?)
669 }
670}
671
672fn add_new_remove_old_builtin_items_migration(
677 builtins_cfg: &BuiltinsConfig,
678 txn: &mut mz_catalog::durable::Transaction<'_>,
679) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
680 let mut new_builtin_mappings = Vec::new();
681 let mut migrated_builtin_ids = Vec::new();
682 let mut builtin_descs = HashSet::new();
684
685 let mut builtins = Vec::new();
688 for builtin in BUILTINS::iter(builtins_cfg) {
689 let desc = SystemObjectDescription {
690 schema_name: builtin.schema().to_string(),
691 object_type: builtin.catalog_item_type(),
692 object_name: builtin.name().to_string(),
693 };
694 if !builtin_descs.insert(desc.clone()) {
696 panic!(
697 "duplicate builtin description: {:?}, {:?}",
698 SystemObjectDescription {
699 schema_name: builtin.schema().to_string(),
700 object_type: builtin.catalog_item_type(),
701 object_name: builtin.name().to_string(),
702 },
703 builtin
704 );
705 }
706 builtins.push((desc, builtin));
707 }
708
709 let mut system_object_mappings: BTreeMap<_, _> = txn
710 .get_system_object_mappings()
711 .map(|system_object_mapping| {
712 (
713 system_object_mapping.description.clone(),
714 system_object_mapping,
715 )
716 })
717 .collect();
718
719 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
720 builtins.into_iter().partition_map(|(desc, builtin)| {
721 let fingerprint = match builtin.runtime_alterable() {
722 false => builtin.fingerprint(),
723 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
724 };
725 match system_object_mappings.remove(&desc) {
726 Some(system_object_mapping) => {
727 Either::Left((builtin, system_object_mapping, fingerprint))
728 }
729 None => Either::Right((builtin, fingerprint)),
730 }
731 });
732 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
733 let new_builtins: Vec<_> = new_builtins
734 .into_iter()
735 .zip(new_builtin_ids.clone())
736 .collect();
737
738 for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
740 if system_object_mapping.unique_identifier.fingerprint != fingerprint {
741 assert_ne!(
747 *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
748 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
749 );
750 assert_ne!(
751 builtin.catalog_item_type(),
752 CatalogItemType::Type,
753 "types cannot be migrated"
754 );
755 assert_ne!(
756 system_object_mapping.unique_identifier.fingerprint,
757 RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
758 "clearing the runtime alterable flag on an existing object is not permitted",
759 );
760 assert!(
761 !builtin.runtime_alterable(),
762 "setting the runtime alterable flag on an existing object is not permitted"
763 );
764 migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
765 }
766 }
767
768 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
770 new_builtin_mappings.push(SystemObjectMapping {
771 description: SystemObjectDescription {
772 schema_name: builtin.schema().to_string(),
773 object_type: builtin.catalog_item_type(),
774 object_name: builtin.name().to_string(),
775 },
776 unique_identifier: SystemObjectUniqueIdentifier {
777 catalog_id,
778 global_id,
779 fingerprint,
780 },
781 });
782
783 let handled_runtime_alterable = match builtin {
789 Builtin::Connection(c) if c.runtime_alterable => {
790 let mut acl_items = vec![rbac::owner_privilege(
791 mz_sql::catalog::ObjectType::Connection,
792 c.owner_id.clone(),
793 )];
794 acl_items.extend_from_slice(c.access);
795 let versions = BTreeMap::new();
797
798 txn.insert_item(
799 catalog_id,
800 c.oid,
801 global_id,
802 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
803 c.name,
804 c.sql.into(),
805 *c.owner_id,
806 acl_items,
807 versions,
808 )?;
809 true
810 }
811 _ => false,
812 };
813 assert_eq!(
814 builtin.runtime_alterable(),
815 handled_runtime_alterable,
816 "runtime alterable object was not handled by migration",
817 );
818 }
819 txn.set_system_object_mappings(new_builtin_mappings)?;
820
821 let builtins_with_catalog_ids = existing_builtins
823 .iter()
824 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
825 .chain(
826 new_builtins
827 .into_iter()
828 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
829 );
830
831 for (builtin, id) in builtins_with_catalog_ids {
832 let (comment_id, desc, comments) = match builtin {
833 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
834 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
835 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
836 Builtin::Log(_)
837 | Builtin::Type(_)
838 | Builtin::Func(_)
839 | Builtin::ContinualTask(_)
840 | Builtin::Index(_)
841 | Builtin::Connection(_) => continue,
842 };
843 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
844
845 let mut comments = comments.clone();
846 for (col_idx, name) in desc.iter_names().enumerate() {
847 if let Some(comment) = comments.remove(name.as_str()) {
848 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
850 }
851 }
852 assert!(
853 comments.is_empty(),
854 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
855 );
856 }
857
858 let mut deleted_system_objects = BTreeSet::new();
861 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
862 for (_, mapping) in system_object_mappings {
863 deleted_system_objects.insert(mapping.description);
864 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
865 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
866 }
867 }
868 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
874 assert!(
878 deleted_system_objects
879 .iter()
880 .filter(|object| object.object_type != CatalogItemType::Index)
882 .all(
883 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
884 || delete_exceptions.contains(deleted_object)
885 ),
886 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
887 deleted_system_objects
888 );
889 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
890 txn.remove_system_object_mappings(deleted_system_objects)?;
891
892 let new_builtin_collections = new_builtin_ids
894 .into_iter()
895 .map(|(_catalog_id, global_id)| global_id)
896 .collect();
897
898 Ok((migrated_builtin_ids, new_builtin_collections))
899}
900
901fn add_new_remove_old_builtin_clusters_migration(
902 txn: &mut mz_catalog::durable::Transaction<'_>,
903 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
904 cluster_sizes: &ClusterReplicaSizeMap,
905) -> Result<(), mz_catalog::durable::CatalogError> {
906 let mut durable_clusters: BTreeMap<_, _> = txn
907 .get_clusters()
908 .filter(|cluster| cluster.id.is_system())
909 .map(|cluster| (cluster.name.to_string(), cluster))
910 .collect();
911
912 for builtin_cluster in BUILTIN_CLUSTERS {
914 if durable_clusters.remove(builtin_cluster.name).is_none() {
915 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
916 let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_config.size)?;
917
918 txn.insert_system_cluster(
919 builtin_cluster.name,
920 vec![],
921 builtin_cluster.privileges.to_vec(),
922 builtin_cluster.owner_id.to_owned(),
923 mz_catalog::durable::ClusterConfig {
924 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
925 size: cluster_config.size,
926 availability_zones: vec![],
927 replication_factor: cluster_config.replication_factor,
928 disk: cluster_allocation.is_cc,
929 logging: default_logging_config(),
930 optimizer_feature_overrides: Default::default(),
931 schedule: Default::default(),
932 }),
933 workload_class: None,
934 },
935 &HashSet::new(),
936 )?;
937 }
938 }
939
940 let old_clusters = durable_clusters
942 .values()
943 .map(|cluster| cluster.id)
944 .collect();
945 txn.remove_clusters(&old_clusters)?;
946
947 Ok(())
948}
949
950fn add_new_remove_old_builtin_introspection_source_migration(
951 txn: &mut mz_catalog::durable::Transaction<'_>,
952) -> Result<(), AdapterError> {
953 let mut new_indexes = Vec::new();
954 let mut removed_indexes = BTreeSet::new();
955 for cluster in txn.get_clusters() {
956 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
957
958 let mut new_logs = Vec::new();
959
960 for log in BUILTINS::logs() {
961 if introspection_source_index_ids.remove(log.name).is_none() {
962 new_logs.push(log);
963 }
964 }
965
966 for log in new_logs {
967 let (item_id, gid) =
968 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
969 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
970 }
971
972 removed_indexes.extend(
975 introspection_source_index_ids
976 .into_keys()
977 .map(|name| (cluster.id, name.to_string())),
978 );
979 }
980 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
981 txn.remove_introspection_source_indexes(removed_indexes)?;
982 Ok(())
983}
984
985fn add_new_remove_old_builtin_roles_migration(
986 txn: &mut mz_catalog::durable::Transaction<'_>,
987) -> Result<(), mz_catalog::durable::CatalogError> {
988 let mut durable_roles: BTreeMap<_, _> = txn
989 .get_roles()
990 .filter(|role| role.id.is_system() || role.id.is_predefined())
991 .map(|role| (role.name.to_string(), role))
992 .collect();
993
994 for builtin_role in BUILTIN_ROLES {
996 if durable_roles.remove(builtin_role.name).is_none() {
997 txn.insert_builtin_role(
998 builtin_role.id,
999 builtin_role.name.to_string(),
1000 builtin_role.attributes.clone(),
1001 RoleMembership::new(),
1002 RoleVars::default(),
1003 builtin_role.oid,
1004 )?;
1005 }
1006 }
1007
1008 let old_roles = durable_roles.values().map(|role| role.id).collect();
1010 txn.remove_roles(&old_roles)?;
1011
1012 Ok(())
1013}
1014
1015fn add_new_remove_old_builtin_cluster_replicas_migration(
1016 txn: &mut Transaction<'_>,
1017 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1018 cluster_sizes: &ClusterReplicaSizeMap,
1019) -> Result<(), AdapterError> {
1020 let cluster_lookup: BTreeMap<_, _> = txn
1021 .get_clusters()
1022 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1023 .collect();
1024
1025 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1026 .get_cluster_replicas()
1027 .filter(|replica| replica.replica_id.is_system())
1028 .fold(BTreeMap::new(), |mut acc, replica| {
1029 acc.entry(replica.cluster_id)
1030 .or_insert_with(BTreeMap::new)
1031 .insert(replica.name.to_string(), replica);
1032 acc
1033 });
1034
1035 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1037 let cluster = cluster_lookup
1038 .get(builtin_replica.cluster_name)
1039 .expect("builtin cluster replica references non-existent cluster");
1040 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1042 let replica_names = durable_replicas
1043 .get_mut(&cluster.id)
1044 .unwrap_or(&mut empty_map);
1045
1046 let builtin_cluster_boostrap_config =
1047 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1048 if replica_names.remove(builtin_replica.name).is_none()
1049 && builtin_cluster_boostrap_config.replication_factor > 0
1053 {
1054 let replica_size = match cluster.config.variant {
1055 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1056 ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
1057 };
1058 let replica_allocation = cluster_sizes.get_allocation_by_name(&replica_size)?;
1059
1060 let config = builtin_cluster_replica_config(replica_size, replica_allocation);
1061 txn.insert_cluster_replica(
1062 cluster.id,
1063 builtin_replica.name,
1064 config,
1065 MZ_SYSTEM_ROLE_ID,
1066 )?;
1067 }
1068 }
1069
1070 let old_replicas = durable_replicas
1072 .values()
1073 .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1074 .collect();
1075 txn.remove_cluster_replicas(&old_replicas)?;
1076
1077 Ok(())
1078}
1079
1080fn remove_invalid_config_param_role_defaults_migration(
1087 txn: &mut Transaction<'_>,
1088) -> Result<(), AdapterError> {
1089 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1090
1091 let roles_to_migrate: BTreeMap<_, _> = txn
1092 .get_roles()
1093 .filter_map(|mut role| {
1094 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1099
1100 let mut invalid_roles_vars = BTreeMap::new();
1102 for (name, value) in &role.vars.map {
1103 let Ok(session_var) = session_vars.inspect(name) else {
1105 invalid_roles_vars.insert(name.clone(), value.clone());
1106 continue;
1107 };
1108 if session_var.check(value.borrow()).is_err() {
1109 invalid_roles_vars.insert(name.clone(), value.clone());
1110 }
1111 }
1112
1113 if invalid_roles_vars.is_empty() {
1115 return None;
1116 }
1117
1118 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1119
1120 for (name, _value) in invalid_roles_vars {
1122 role.vars.map.remove(&name);
1123 }
1124 Some(role)
1125 })
1126 .map(|role| (role.id, role))
1127 .collect();
1128
1129 txn.update_roles_without_auth(roles_to_migrate)?;
1130
1131 Ok(())
1132}
1133
1134fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1137 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1138 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1139 replica.config.location
1140 {
1141 tx.remove_cluster_replica(replica.replica_id)?;
1142 }
1143 }
1144 Ok(())
1145}
1146
1147pub(crate) fn builtin_cluster_replica_config(
1148 replica_size: String,
1149 replica_allocation: &ReplicaAllocation,
1150) -> mz_catalog::durable::ReplicaConfig {
1151 mz_catalog::durable::ReplicaConfig {
1152 location: mz_catalog::durable::ReplicaLocation::Managed {
1153 availability_zone: None,
1154 billed_as: None,
1155 disk: replica_allocation.is_cc,
1156 pending: false,
1157 internal: false,
1158 size: replica_size,
1159 },
1160 logging: default_logging_config(),
1161 }
1162}
1163
1164fn default_logging_config() -> ReplicaLogging {
1165 ReplicaLogging {
1166 log_logging: false,
1167 interval: Some(Duration::from_secs(1)),
1168 }
1169}
1170
1171#[derive(Debug)]
1172pub struct BuiltinBootstrapClusterConfigMap {
1173 pub system_cluster: BootstrapBuiltinClusterConfig,
1175 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1177 pub probe_cluster: BootstrapBuiltinClusterConfig,
1179 pub support_cluster: BootstrapBuiltinClusterConfig,
1181 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1183}
1184
1185impl BuiltinBootstrapClusterConfigMap {
1186 fn get_config(
1188 &self,
1189 cluster_name: &str,
1190 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1191 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1192 &self.system_cluster
1193 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1194 &self.catalog_server_cluster
1195 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1196 &self.probe_cluster
1197 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1198 &self.support_cluster
1199 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1200 &self.analytics_cluster
1201 } else {
1202 return Err(mz_catalog::durable::CatalogError::Catalog(
1203 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1204 ));
1205 };
1206 Ok(cluster_config.clone())
1207 }
1208}
1209
1210pub(crate) fn into_consolidatable_updates_startup(
1227 updates: Vec<StateUpdate>,
1228 ts: Timestamp,
1229) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1230 updates
1231 .into_iter()
1232 .map(|StateUpdate { kind, ts: _, diff }| {
1233 let kind: BootstrapStateUpdateKind = kind
1234 .try_into()
1235 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1236 (kind, ts, Diff::from(diff))
1237 })
1238 .collect()
1239}
1240
1241fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1242 defaults: &BTreeMap<String, String>,
1243 remote: Option<&BTreeMap<String, String>>,
1244 cfg: &mz_dyncfg::Config<T>,
1245) -> T::ConfigType {
1246 let mut val = T::into_config_type(cfg.default().clone());
1247 let get_fn = |map: &BTreeMap<String, String>| {
1248 let val = map.get(cfg.name())?;
1249 match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1250 Ok(x) => Some(x),
1251 Err(err) => {
1252 tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1253 None
1254 }
1255 }
1256 };
1257 if let Some(x) = get_fn(defaults) {
1258 val = x;
1259 }
1260 if let Some(x) = remote.and_then(get_fn) {
1261 val = x;
1262 }
1263 val
1264}