1mod builtin_item_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use futures::future::{BoxFuture, FutureExt};
19use itertools::{Either, Itertools};
20use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
21use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
22use mz_auth::hash::scram256_hash;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
26 Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
27};
28use mz_catalog::config::StateConfig;
29use mz_catalog::durable::objects::{
30 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
31};
32use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
33use mz_catalog::expr_cache::{
34 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
35};
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
39};
40use mz_controller::clusters::ReplicaLogging;
41use mz_controller_types::ClusterId;
42use mz_ore::cast::usize_to_u64;
43use mz_ore::collections::HashSet;
44use mz_ore::now::{SYSTEM_TIME, to_datetime};
45use mz_ore::{instrument, soft_assert_no_log};
46use mz_repr::adt::mz_acl_item::PrivilegeMap;
47use mz_repr::namespaces::is_unstable_schema;
48use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
49use mz_sql::catalog::{
50 BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
51};
52use mz_sql::func::OP_IMPLS;
53use mz_sql::names::CommentObjectId;
54use mz_sql::rbac;
55use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
56use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
57use mz_storage_client::storage_collections::StorageCollections;
58use tracing::{Instrument, info, warn};
59use uuid::Uuid;
60
61use crate::AdapterError;
63use crate::catalog::open::builtin_item_migration::{
64 BuiltinItemMigrationResult, migrate_builtin_items,
65};
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68 BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name, migrate,
69};
70
71pub struct InitializeStateResult {
72 pub state: CatalogState,
74 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
76 pub new_builtin_collections: BTreeSet<GlobalId>,
78 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
80 pub last_seen_version: String,
82 pub expr_cache_handle: Option<ExpressionCacheHandle>,
84 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
86 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
88}
89
90pub struct OpenCatalogResult {
91 pub catalog: Catalog,
93 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
95 pub new_builtin_collections: BTreeSet<GlobalId>,
97 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
99 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
101 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
103}
104
105impl Catalog {
106 pub async fn initialize_state<'a>(
110 config: StateConfig,
111 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
112 ) -> Result<InitializeStateResult, AdapterError> {
113 for builtin_role in BUILTIN_ROLES {
114 assert!(
115 is_reserved_name(builtin_role.name),
116 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
117 BUILTIN_PREFIXES.join(", ")
118 );
119 }
120 for builtin_cluster in BUILTIN_CLUSTERS {
121 assert!(
122 is_reserved_name(builtin_cluster.name),
123 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
124 BUILTIN_PREFIXES.join(", ")
125 );
126 }
127
128 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
129 if config.all_features {
130 system_configuration.enable_all_feature_flags_by_default();
131 }
132
133 let mut state = CatalogState {
134 database_by_name: BTreeMap::new(),
135 database_by_id: BTreeMap::new(),
136 entry_by_id: BTreeMap::new(),
137 entry_by_global_id: BTreeMap::new(),
138 ambient_schemas_by_name: BTreeMap::new(),
139 ambient_schemas_by_id: BTreeMap::new(),
140 clusters_by_name: BTreeMap::new(),
141 clusters_by_id: BTreeMap::new(),
142 roles_by_name: BTreeMap::new(),
143 roles_by_id: BTreeMap::new(),
144 network_policies_by_id: BTreeMap::new(),
145 role_auth_by_id: BTreeMap::new(),
146 network_policies_by_name: BTreeMap::new(),
147 system_configuration,
148 default_privileges: DefaultPrivileges::default(),
149 system_privileges: PrivilegeMap::default(),
150 comments: CommentsMap::default(),
151 source_references: BTreeMap::new(),
152 storage_metadata: Default::default(),
153 temporary_schemas: BTreeMap::new(),
154 mock_authentication_nonce: Default::default(),
155 config: mz_sql::catalog::CatalogConfig {
156 start_time: to_datetime((config.now)()),
157 start_instant: Instant::now(),
158 nonce: rand::random(),
159 environment_id: config.environment_id,
160 session_id: Uuid::new_v4(),
161 build_info: config.build_info,
162 timestamp_interval: Duration::from_secs(1),
163 now: config.now.clone(),
164 connection_context: config.connection_context,
165 builtins_cfg: BuiltinsConfig {
166 include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
170 &config.system_parameter_defaults,
171 config.remote_system_parameters.as_ref(),
172 &ENABLE_CONTINUAL_TASK_BUILTINS,
173 ),
174 },
175 helm_chart_version: config.helm_chart_version,
176 },
177 cluster_replica_sizes: config.cluster_replica_sizes,
178 availability_zones: config.availability_zones,
179 egress_addresses: config.egress_addresses,
180 aws_principal_context: config.aws_principal_context,
181 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
182 http_host_name: config.http_host_name,
183 license_key: config.license_key,
184 };
185
186 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
187 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
188 let mut txn = storage.transaction().await?;
189
190 let (migrated_builtins, new_builtin_collections) = {
192 migrate::durable_migrate(
193 &mut txn,
194 state.config.environment_id.organization_id(),
195 config.boot_ts,
196 )?;
197 if let Some(remote_system_parameters) = config.remote_system_parameters {
200 for (name, value) in remote_system_parameters {
201 txn.upsert_system_config(&name, value)?;
202 }
203 txn.set_system_config_synced_once()?;
204 }
205 let (migrated_builtins, new_builtin_collections) =
207 add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
208 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
209 system_cluster: config.builtin_system_cluster_config,
210 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
211 probe_cluster: config.builtin_probe_cluster_config,
212 support_cluster: config.builtin_support_cluster_config,
213 analytics_cluster: config.builtin_analytics_cluster_config,
214 };
215 add_new_remove_old_builtin_clusters_migration(
216 &mut txn,
217 &builtin_bootstrap_cluster_config_map,
218 )?;
219 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
220 add_new_remove_old_builtin_cluster_replicas_migration(
221 &mut txn,
222 &builtin_bootstrap_cluster_config_map,
223 )?;
224 add_new_remove_old_builtin_roles_migration(&mut txn)?;
225 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
226 (migrated_builtins, new_builtin_collections)
227 };
228 remove_pending_cluster_replicas_migration(&mut txn)?;
229
230 let op_updates = txn.get_and_commit_op_updates();
231 updates.extend(op_updates);
232
233 let mut builtin_table_updates = Vec::new();
234
235 {
237 for (name, value) in config.system_parameter_defaults {
240 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
241 Ok(_) => (),
242 Err(Error {
243 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
244 }) => {
245 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
246 }
247 Err(e) => return Err(e.into()),
248 };
249 }
250 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
251 }
252
253 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
256 differential_dataflow::consolidation::consolidate_updates(&mut updates);
257 soft_assert_no_log!(
258 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
259 "consolidated updates should be positive during startup: {updates:?}"
260 );
261
262 let mut pre_item_updates = Vec::new();
263 let mut system_item_updates = Vec::new();
264 let mut item_updates = Vec::new();
265 let mut post_item_updates = Vec::new();
266 let mut audit_log_updates = Vec::new();
267 for (kind, ts, diff) in updates {
268 match kind {
269 BootstrapStateUpdateKind::Role(_)
270 | BootstrapStateUpdateKind::RoleAuth(_)
271 | BootstrapStateUpdateKind::Database(_)
272 | BootstrapStateUpdateKind::Schema(_)
273 | BootstrapStateUpdateKind::DefaultPrivilege(_)
274 | BootstrapStateUpdateKind::SystemPrivilege(_)
275 | BootstrapStateUpdateKind::SystemConfiguration(_)
276 | BootstrapStateUpdateKind::Cluster(_)
277 | BootstrapStateUpdateKind::NetworkPolicy(_)
278 | BootstrapStateUpdateKind::ClusterReplica(_) => {
279 pre_item_updates.push(StateUpdate {
280 kind: kind.into(),
281 ts,
282 diff: diff.try_into().expect("valid diff"),
283 })
284 }
285 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
286 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
287 system_item_updates.push(StateUpdate {
288 kind: kind.into(),
289 ts,
290 diff: diff.try_into().expect("valid diff"),
291 })
292 }
293 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
294 kind: kind.into(),
295 ts,
296 diff: diff.try_into().expect("valid diff"),
297 }),
298 BootstrapStateUpdateKind::Comment(_)
299 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
300 | BootstrapStateUpdateKind::SourceReferences(_)
301 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
302 post_item_updates.push((kind, ts, diff));
303 }
304 BootstrapStateUpdateKind::AuditLog(_) => {
305 audit_log_updates.push(StateUpdate {
306 kind: kind.into(),
307 ts,
308 diff: diff.try_into().expect("valid diff"),
309 });
310 }
311 }
312 }
313
314 let builtin_table_update = state
315 .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
316 .await;
317 builtin_table_updates.extend(builtin_table_update);
318
319 {
323 if let Some(password) = config.external_login_password_mz_system {
324 let role_auth = RoleAuth {
325 role_id: MZ_SYSTEM_ROLE_ID,
326 password_hash: Some(scram256_hash(&password).map_err(|_| {
327 AdapterError::Internal("Failed to hash mz_system password.".to_owned())
328 })?),
329 updated_at: SYSTEM_TIME(),
330 };
331 state
332 .role_auth_by_id
333 .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
334 let builtin_table_update = state.generate_builtin_table_update(
335 mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
336 mz_catalog::memory::objects::StateDiff::Addition,
337 );
338 builtin_table_updates.extend(builtin_table_update);
339 }
340 }
341
342 let expr_cache_start = Instant::now();
343 info!("startup: coordinator init: catalog open: expr cache open beginning");
344 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
347 let expr_cache_enabled = config
348 .enable_expression_cache_override
349 .unwrap_or(enable_expr_cache_dyncfg);
350 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
351 info!(
352 ?config.enable_expression_cache_override,
353 ?enable_expr_cache_dyncfg,
354 "using expression cache for startup"
355 );
356 let current_ids = txn
357 .get_items()
358 .flat_map(|item| {
359 let gid = item.global_id.clone();
360 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
361 std::iter::once(gid).chain(gids.into_iter())
362 })
363 .chain(
364 txn.get_system_object_mappings()
365 .map(|som| som.unique_identifier.global_id),
366 )
367 .collect();
368 let dyncfgs = config.persist_client.dyncfgs().clone();
369 let build_version = if config.build_info.is_dev() {
370 config
373 .build_info
374 .semver_version_build()
375 .expect("build ID is not available on your platform!")
376 } else {
377 config.build_info.semver_version()
378 };
379 let expr_cache_config = ExpressionCacheConfig {
380 build_version,
381 shard_id: txn
382 .get_expression_cache_shard()
383 .expect("expression cache shard should exist for opened catalogs"),
384 persist: config.persist_client,
385 current_ids,
386 remove_prior_versions: !config.read_only,
387 compact_shard: config.read_only,
388 dyncfgs,
389 };
390 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
391 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
392 (
393 Some(expr_cache_handle),
394 cached_local_exprs,
395 cached_global_exprs,
396 )
397 } else {
398 (None, BTreeMap::new(), BTreeMap::new())
399 };
400 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
401 info!(
402 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
403 expr_cache_start.elapsed()
404 );
405
406 let builtin_table_update = state
407 .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
408 .await;
409 builtin_table_updates.extend(builtin_table_update);
410
411 let last_seen_version = txn
412 .get_catalog_content_version()
413 .unwrap_or("new")
414 .to_string();
415
416 let mz_authentication_mock_nonce =
417 txn.get_authentication_mock_nonce().ok_or_else(|| {
418 Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
419 })?;
420
421 state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
422
423 let builtin_table_update = if !config.skip_migrations {
425 let migrate_result = migrate::migrate(
426 &mut state,
427 &mut txn,
428 &mut local_expr_cache,
429 item_updates,
430 config.now,
431 config.boot_ts,
432 )
433 .await
434 .map_err(|e| {
435 Error::new(ErrorKind::FailedMigration {
436 last_seen_version: last_seen_version.clone(),
437 this_version: config.build_info.version,
438 cause: e.to_string(),
439 })
440 })?;
441 if !migrate_result.post_item_updates.is_empty() {
442 post_item_updates.extend(migrate_result.post_item_updates);
445 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
447 for (_, ts, _) in &mut post_item_updates {
448 *ts = max_ts;
449 }
450 }
451 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
452 }
453
454 migrate_result.builtin_table_updates
455 } else {
456 state
457 .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
458 .await
459 };
460 builtin_table_updates.extend(builtin_table_update);
461
462 let post_item_updates = post_item_updates
463 .into_iter()
464 .map(|(kind, ts, diff)| StateUpdate {
465 kind: kind.into(),
466 ts,
467 diff: diff.try_into().expect("valid diff"),
468 })
469 .collect();
470 let builtin_table_update = state
471 .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
472 .await;
473 builtin_table_updates.extend(builtin_table_update);
474
475 for audit_log_update in audit_log_updates {
479 builtin_table_updates.extend(
480 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
481 );
482 }
483
484 let BuiltinItemMigrationResult {
486 builtin_table_updates: builtin_table_update,
487 migrated_storage_collections_0dt,
488 cleanup_action,
489 } = migrate_builtin_items(
490 &mut state,
491 &mut txn,
492 &mut local_expr_cache,
493 migrated_builtins,
494 config.builtin_item_migration_config,
495 )
496 .await?;
497 builtin_table_updates.extend(builtin_table_update);
498 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
499
500 txn.commit(config.boot_ts).await?;
501
502 cleanup_action.await;
503
504 Ok(InitializeStateResult {
505 state,
506 migrated_storage_collections_0dt,
507 new_builtin_collections: new_builtin_collections.into_iter().collect(),
508 builtin_table_updates,
509 last_seen_version,
510 expr_cache_handle,
511 cached_global_exprs,
512 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
513 })
514 }
515
516 #[instrument(name = "catalog::open")]
527 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
528 async move {
529 let mut storage = config.storage;
530
531 let InitializeStateResult {
532 state,
533 migrated_storage_collections_0dt,
534 new_builtin_collections,
535 mut builtin_table_updates,
536 last_seen_version: _,
537 expr_cache_handle,
538 cached_global_exprs,
539 uncached_local_exprs,
540 } =
541 Self::initialize_state(config.state, &mut storage)
545 .instrument(tracing::info_span!("catalog::initialize_state"))
546 .boxed()
547 .await?;
548
549 let catalog = Catalog {
550 state,
551 plans: CatalogPlans::default(),
552 expr_cache_handle,
553 transient_revision: 1,
554 storage: Arc::new(tokio::sync::Mutex::new(storage)),
555 };
556
557 for (op, func) in OP_IMPLS.iter() {
560 match func {
561 mz_sql::func::Func::Scalar(impls) => {
562 for imp in impls {
563 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
564 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
565 ));
566 }
567 }
568 _ => unreachable!("all operators must be scalar functions"),
569 }
570 }
571
572 for ip in &catalog.state.egress_addresses {
573 builtin_table_updates.push(
574 catalog
575 .state
576 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
577 );
578 }
579
580 if !catalog.state.license_key.id.is_empty() {
581 builtin_table_updates.push(
582 catalog.state.resolve_builtin_table_update(
583 catalog
584 .state
585 .pack_license_key_update(&catalog.state.license_key)?,
586 ),
587 );
588 }
589
590 catalog.storage().await.mark_bootstrap_complete();
591
592 Ok(OpenCatalogResult {
593 catalog,
594 migrated_storage_collections_0dt,
595 new_builtin_collections,
596 builtin_table_updates,
597 cached_global_exprs,
598 uncached_local_exprs,
599 })
600 }
601 .instrument(tracing::info_span!("catalog::open"))
602 .boxed()
603 }
604
605 async fn initialize_storage_state(
612 &mut self,
613 storage_collections: &Arc<
614 dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
615 >,
616 ) -> Result<(), mz_catalog::durable::CatalogError> {
617 let collections = self
618 .entries()
619 .filter(|entry| entry.item().is_storage_collection())
620 .flat_map(|entry| entry.global_ids())
621 .collect();
622
623 let mut state = self.state.clone();
626
627 let mut storage = self.storage().await;
628 let mut txn = storage.transaction().await?;
629
630 storage_collections
631 .initialize_state(&mut txn, collections)
632 .await
633 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
634
635 let updates = txn.get_and_commit_op_updates();
636 let builtin_updates = state.apply_updates(updates)?;
637 assert!(builtin_updates.is_empty());
638 let commit_ts = txn.upper();
639 txn.commit(commit_ts).await?;
640 drop(storage);
641
642 self.state = state;
644 Ok(())
645 }
646
647 pub async fn initialize_controller(
650 &mut self,
651 config: mz_controller::ControllerConfig,
652 envd_epoch: core::num::NonZeroI64,
653 read_only: bool,
654 ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
655 {
656 let controller_start = Instant::now();
657 info!("startup: controller init: beginning");
658
659 let controller = {
660 let mut storage = self.storage().await;
661 let mut tx = storage.transaction().await?;
662 mz_controller::prepare_initialization(&mut tx)
663 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
664 let updates = tx.get_and_commit_op_updates();
665 assert!(
666 updates.is_empty(),
667 "initializing controller should not produce updates: {updates:?}"
668 );
669 let commit_ts = tx.upper();
670 tx.commit(commit_ts).await?;
671
672 let read_only_tx = storage.transaction().await?;
673
674 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
675 };
676
677 self.initialize_storage_state(&controller.storage_collections)
678 .await?;
679
680 info!(
681 "startup: controller init: complete in {:?}",
682 controller_start.elapsed()
683 );
684
685 Ok(controller)
686 }
687
688 pub async fn expire(self) {
690 if let Some(storage) = Arc::into_inner(self.storage) {
693 let storage = storage.into_inner();
694 storage.expire().await;
695 }
696 }
697}
698
699impl CatalogState {
700 fn set_system_configuration_default(
702 &mut self,
703 name: &str,
704 value: VarInput,
705 ) -> Result<(), Error> {
706 Ok(self.system_configuration.set_default(name, value)?)
707 }
708}
709
710fn add_new_remove_old_builtin_items_migration(
715 builtins_cfg: &BuiltinsConfig,
716 txn: &mut mz_catalog::durable::Transaction<'_>,
717) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
718 let mut new_builtin_mappings = Vec::new();
719 let mut migrated_builtin_ids = Vec::new();
720 let mut builtin_descs = HashSet::new();
722
723 let mut builtins = Vec::new();
726 for builtin in BUILTINS::iter(builtins_cfg) {
727 let desc = SystemObjectDescription {
728 schema_name: builtin.schema().to_string(),
729 object_type: builtin.catalog_item_type(),
730 object_name: builtin.name().to_string(),
731 };
732 if !builtin_descs.insert(desc.clone()) {
734 panic!(
735 "duplicate builtin description: {:?}, {:?}",
736 SystemObjectDescription {
737 schema_name: builtin.schema().to_string(),
738 object_type: builtin.catalog_item_type(),
739 object_name: builtin.name().to_string(),
740 },
741 builtin
742 );
743 }
744 builtins.push((desc, builtin));
745 }
746
747 let mut system_object_mappings: BTreeMap<_, _> = txn
748 .get_system_object_mappings()
749 .map(|system_object_mapping| {
750 (
751 system_object_mapping.description.clone(),
752 system_object_mapping,
753 )
754 })
755 .collect();
756
757 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
758 builtins.into_iter().partition_map(|(desc, builtin)| {
759 let fingerprint = match builtin.runtime_alterable() {
760 false => builtin.fingerprint(),
761 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
762 };
763 match system_object_mappings.remove(&desc) {
764 Some(system_object_mapping) => {
765 Either::Left((builtin, system_object_mapping, fingerprint))
766 }
767 None => Either::Right((builtin, fingerprint)),
768 }
769 });
770 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
771 let new_builtins: Vec<_> = new_builtins
772 .into_iter()
773 .zip_eq(new_builtin_ids.clone())
774 .collect();
775
776 for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
778 if system_object_mapping.unique_identifier.fingerprint != fingerprint {
779 assert_ne!(
785 *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
786 "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
787 );
788 assert_ne!(
789 builtin.catalog_item_type(),
790 CatalogItemType::Type,
791 "types cannot be migrated"
792 );
793 assert_ne!(
794 system_object_mapping.unique_identifier.fingerprint,
795 RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
796 "clearing the runtime alterable flag on an existing object is not permitted",
797 );
798 assert!(
799 !builtin.runtime_alterable(),
800 "setting the runtime alterable flag on an existing object is not permitted"
801 );
802 migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
803 }
804 }
805
806 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
808 new_builtin_mappings.push(SystemObjectMapping {
809 description: SystemObjectDescription {
810 schema_name: builtin.schema().to_string(),
811 object_type: builtin.catalog_item_type(),
812 object_name: builtin.name().to_string(),
813 },
814 unique_identifier: SystemObjectUniqueIdentifier {
815 catalog_id,
816 global_id,
817 fingerprint,
818 },
819 });
820
821 let handled_runtime_alterable = match builtin {
827 Builtin::Connection(c) if c.runtime_alterable => {
828 let mut acl_items = vec![rbac::owner_privilege(
829 mz_sql::catalog::ObjectType::Connection,
830 c.owner_id.clone(),
831 )];
832 acl_items.extend_from_slice(c.access);
833 let versions = BTreeMap::new();
835
836 txn.insert_item(
837 catalog_id,
838 c.oid,
839 global_id,
840 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
841 c.name,
842 c.sql.into(),
843 *c.owner_id,
844 acl_items,
845 versions,
846 )?;
847 true
848 }
849 _ => false,
850 };
851 assert_eq!(
852 builtin.runtime_alterable(),
853 handled_runtime_alterable,
854 "runtime alterable object was not handled by migration",
855 );
856 }
857 txn.set_system_object_mappings(new_builtin_mappings)?;
858
859 let builtins_with_catalog_ids = existing_builtins
861 .iter()
862 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
863 .chain(
864 new_builtins
865 .into_iter()
866 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
867 );
868
869 for (builtin, id) in builtins_with_catalog_ids {
870 let (comment_id, desc, comments) = match builtin {
871 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
872 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
873 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
874 Builtin::Log(_)
875 | Builtin::Type(_)
876 | Builtin::Func(_)
877 | Builtin::ContinualTask(_)
878 | Builtin::Index(_)
879 | Builtin::Connection(_) => continue,
880 };
881 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
882
883 let mut comments = comments.clone();
884 for (col_idx, name) in desc.iter_names().enumerate() {
885 if let Some(comment) = comments.remove(name.as_str()) {
886 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
888 }
889 }
890 assert!(
891 comments.is_empty(),
892 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
893 );
894 }
895
896 let mut deleted_system_objects = BTreeSet::new();
899 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
900 let mut deleted_comments = BTreeSet::new();
901 for (desc, mapping) in system_object_mappings {
902 deleted_system_objects.insert(mapping.description);
903 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
904 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
905 }
906
907 let id = mapping.unique_identifier.catalog_id;
908 let comment_id = match desc.object_type {
909 CatalogItemType::Table => CommentObjectId::Table(id),
910 CatalogItemType::Source => CommentObjectId::Source(id),
911 CatalogItemType::View => CommentObjectId::View(id),
912 CatalogItemType::Sink
913 | CatalogItemType::MaterializedView
914 | CatalogItemType::Index
915 | CatalogItemType::Type
916 | CatalogItemType::Func
917 | CatalogItemType::Secret
918 | CatalogItemType::Connection
919 | CatalogItemType::ContinualTask => continue,
920 };
921 deleted_comments.insert(comment_id);
922 }
923 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
929 assert!(
933 deleted_system_objects
934 .iter()
935 .filter(|object| object.object_type != CatalogItemType::Index)
937 .all(
938 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
939 || delete_exceptions.contains(deleted_object)
940 ),
941 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
942 deleted_system_objects
943 );
944 txn.drop_comments(&deleted_comments)?;
945 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
946 txn.remove_system_object_mappings(deleted_system_objects)?;
947
948 let new_builtin_collections = new_builtin_ids
950 .into_iter()
951 .map(|(_catalog_id, global_id)| global_id)
952 .collect();
953
954 Ok((migrated_builtin_ids, new_builtin_collections))
955}
956
957fn add_new_remove_old_builtin_clusters_migration(
958 txn: &mut mz_catalog::durable::Transaction<'_>,
959 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
960) -> Result<(), mz_catalog::durable::CatalogError> {
961 let mut durable_clusters: BTreeMap<_, _> = txn
962 .get_clusters()
963 .filter(|cluster| cluster.id.is_system())
964 .map(|cluster| (cluster.name.to_string(), cluster))
965 .collect();
966
967 for builtin_cluster in BUILTIN_CLUSTERS {
969 if durable_clusters.remove(builtin_cluster.name).is_none() {
970 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
971
972 txn.insert_system_cluster(
973 builtin_cluster.name,
974 vec![],
975 builtin_cluster.privileges.to_vec(),
976 builtin_cluster.owner_id.to_owned(),
977 mz_catalog::durable::ClusterConfig {
978 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
979 size: cluster_config.size,
980 availability_zones: vec![],
981 replication_factor: cluster_config.replication_factor,
982 logging: default_logging_config(),
983 optimizer_feature_overrides: Default::default(),
984 schedule: Default::default(),
985 }),
986 workload_class: None,
987 },
988 &HashSet::new(),
989 )?;
990 }
991 }
992
993 let old_clusters = durable_clusters
995 .values()
996 .map(|cluster| cluster.id)
997 .collect();
998 txn.remove_clusters(&old_clusters)?;
999
1000 Ok(())
1001}
1002
1003fn add_new_remove_old_builtin_introspection_source_migration(
1004 txn: &mut mz_catalog::durable::Transaction<'_>,
1005) -> Result<(), AdapterError> {
1006 let mut new_indexes = Vec::new();
1007 let mut removed_indexes = BTreeSet::new();
1008 for cluster in txn.get_clusters() {
1009 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1010
1011 let mut new_logs = Vec::new();
1012
1013 for log in BUILTINS::logs() {
1014 if introspection_source_index_ids.remove(log.name).is_none() {
1015 new_logs.push(log);
1016 }
1017 }
1018
1019 for log in new_logs {
1020 let (item_id, gid) =
1021 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1022 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1023 }
1024
1025 removed_indexes.extend(
1028 introspection_source_index_ids
1029 .into_keys()
1030 .map(|name| (cluster.id, name.to_string())),
1031 );
1032 }
1033 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1034 txn.remove_introspection_source_indexes(removed_indexes)?;
1035 Ok(())
1036}
1037
1038fn add_new_remove_old_builtin_roles_migration(
1039 txn: &mut mz_catalog::durable::Transaction<'_>,
1040) -> Result<(), mz_catalog::durable::CatalogError> {
1041 let mut durable_roles: BTreeMap<_, _> = txn
1042 .get_roles()
1043 .filter(|role| role.id.is_system() || role.id.is_predefined())
1044 .map(|role| (role.name.to_string(), role))
1045 .collect();
1046
1047 for builtin_role in BUILTIN_ROLES {
1049 if durable_roles.remove(builtin_role.name).is_none() {
1050 txn.insert_builtin_role(
1051 builtin_role.id,
1052 builtin_role.name.to_string(),
1053 builtin_role.attributes.clone(),
1054 RoleMembership::new(),
1055 RoleVars::default(),
1056 builtin_role.oid,
1057 )?;
1058 }
1059 }
1060
1061 let old_roles = durable_roles.values().map(|role| role.id).collect();
1063 txn.remove_roles(&old_roles)?;
1064
1065 Ok(())
1066}
1067
1068fn add_new_remove_old_builtin_cluster_replicas_migration(
1069 txn: &mut Transaction<'_>,
1070 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1071) -> Result<(), AdapterError> {
1072 let cluster_lookup: BTreeMap<_, _> = txn
1073 .get_clusters()
1074 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1075 .collect();
1076
1077 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1078 .get_cluster_replicas()
1079 .filter(|replica| replica.replica_id.is_system())
1080 .fold(BTreeMap::new(), |mut acc, replica| {
1081 acc.entry(replica.cluster_id)
1082 .or_insert_with(BTreeMap::new)
1083 .insert(replica.name.to_string(), replica);
1084 acc
1085 });
1086
1087 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1089 let cluster = cluster_lookup
1090 .get(builtin_replica.cluster_name)
1091 .expect("builtin cluster replica references non-existent cluster");
1092 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1094 let replica_names = durable_replicas
1095 .get_mut(&cluster.id)
1096 .unwrap_or(&mut empty_map);
1097
1098 let builtin_cluster_boostrap_config =
1099 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1100 if replica_names.remove(builtin_replica.name).is_none()
1101 && builtin_cluster_boostrap_config.replication_factor > 0
1105 {
1106 let replica_size = match cluster.config.variant {
1107 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1108 ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
1109 };
1110
1111 let config = builtin_cluster_replica_config(replica_size);
1112 txn.insert_cluster_replica(
1113 cluster.id,
1114 builtin_replica.name,
1115 config,
1116 MZ_SYSTEM_ROLE_ID,
1117 )?;
1118 }
1119 }
1120
1121 let old_replicas = durable_replicas
1123 .values()
1124 .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1125 .collect();
1126 txn.remove_cluster_replicas(&old_replicas)?;
1127
1128 Ok(())
1129}
1130
1131fn remove_invalid_config_param_role_defaults_migration(
1138 txn: &mut Transaction<'_>,
1139) -> Result<(), AdapterError> {
1140 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1141
1142 let roles_to_migrate: BTreeMap<_, _> = txn
1143 .get_roles()
1144 .filter_map(|mut role| {
1145 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1150
1151 let mut invalid_roles_vars = BTreeMap::new();
1153 for (name, value) in &role.vars.map {
1154 let Ok(session_var) = session_vars.inspect(name) else {
1156 invalid_roles_vars.insert(name.clone(), value.clone());
1157 continue;
1158 };
1159 if session_var.check(value.borrow()).is_err() {
1160 invalid_roles_vars.insert(name.clone(), value.clone());
1161 }
1162 }
1163
1164 if invalid_roles_vars.is_empty() {
1166 return None;
1167 }
1168
1169 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1170
1171 for (name, _value) in invalid_roles_vars {
1173 role.vars.map.remove(&name);
1174 }
1175 Some(role)
1176 })
1177 .map(|role| (role.id, role))
1178 .collect();
1179
1180 txn.update_roles_without_auth(roles_to_migrate)?;
1181
1182 Ok(())
1183}
1184
1185fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1188 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1189 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1190 replica.config.location
1191 {
1192 tx.remove_cluster_replica(replica.replica_id)?;
1193 }
1194 }
1195 Ok(())
1196}
1197
1198pub(crate) fn builtin_cluster_replica_config(
1199 replica_size: String,
1200) -> mz_catalog::durable::ReplicaConfig {
1201 mz_catalog::durable::ReplicaConfig {
1202 location: mz_catalog::durable::ReplicaLocation::Managed {
1203 availability_zone: None,
1204 billed_as: None,
1205 pending: false,
1206 internal: false,
1207 size: replica_size,
1208 },
1209 logging: default_logging_config(),
1210 }
1211}
1212
1213fn default_logging_config() -> ReplicaLogging {
1214 ReplicaLogging {
1215 log_logging: false,
1216 interval: Some(Duration::from_secs(1)),
1217 }
1218}
1219
1220#[derive(Debug)]
1221pub struct BuiltinBootstrapClusterConfigMap {
1222 pub system_cluster: BootstrapBuiltinClusterConfig,
1224 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1226 pub probe_cluster: BootstrapBuiltinClusterConfig,
1228 pub support_cluster: BootstrapBuiltinClusterConfig,
1230 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1232}
1233
1234impl BuiltinBootstrapClusterConfigMap {
1235 fn get_config(
1237 &self,
1238 cluster_name: &str,
1239 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1240 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1241 &self.system_cluster
1242 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1243 &self.catalog_server_cluster
1244 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1245 &self.probe_cluster
1246 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1247 &self.support_cluster
1248 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1249 &self.analytics_cluster
1250 } else {
1251 return Err(mz_catalog::durable::CatalogError::Catalog(
1252 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1253 ));
1254 };
1255 Ok(cluster_config.clone())
1256 }
1257}
1258
1259pub(crate) fn into_consolidatable_updates_startup(
1276 updates: Vec<StateUpdate>,
1277 ts: Timestamp,
1278) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1279 updates
1280 .into_iter()
1281 .map(|StateUpdate { kind, ts: _, diff }| {
1282 let kind: BootstrapStateUpdateKind = kind
1283 .try_into()
1284 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1285 (kind, ts, Diff::from(diff))
1286 })
1287 .collect()
1288}
1289
1290fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1291 defaults: &BTreeMap<String, String>,
1292 remote: Option<&BTreeMap<String, String>>,
1293 cfg: &mz_dyncfg::Config<T>,
1294) -> T::ConfigType {
1295 let mut val = T::into_config_type(cfg.default().clone());
1296 let get_fn = |map: &BTreeMap<String, String>| {
1297 let val = map.get(cfg.name())?;
1298 match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1299 Ok(x) => Some(x),
1300 Err(err) => {
1301 tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1302 None
1303 }
1304 }
1305 };
1306 if let Some(x) = get_fn(defaults) {
1307 val = x;
1308 }
1309 if let Some(x) = remote.and_then(get_fn) {
1310 val = x;
1311 }
1312 val
1313}