mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
23use mz_auth::hash::scram256_hash;
24use mz_catalog::SYSTEM_CONN_ID;
25use mz_catalog::builtin::{
26    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
27    Fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
28};
29use mz_catalog::config::StateConfig;
30use mz_catalog::durable::objects::{
31    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
32};
33use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
34use mz_catalog::expr_cache::{
35    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
36};
37use mz_catalog::memory::error::{Error, ErrorKind};
38use mz_catalog::memory::objects::{
39    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
40};
41use mz_controller::clusters::ReplicaLogging;
42use mz_controller_types::ClusterId;
43use mz_ore::cast::usize_to_u64;
44use mz_ore::collections::HashSet;
45use mz_ore::now::{SYSTEM_TIME, to_datetime};
46use mz_ore::{instrument, soft_assert_no_log};
47use mz_repr::adt::mz_acl_item::PrivilegeMap;
48use mz_repr::namespaces::is_unstable_schema;
49use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
50use mz_sql::catalog::{
51    BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
52};
53use mz_sql::func::OP_IMPLS;
54use mz_sql::names::CommentObjectId;
55use mz_sql::rbac;
56use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
57use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
58use mz_storage_client::storage_collections::StorageCollections;
59use tracing::{Instrument, info, warn};
60use uuid::Uuid;
61
62// DO NOT add any more imports from `crate` outside of `crate::catalog`.
63use crate::AdapterError;
64use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
65use crate::catalog::state::LocalExpressionCache;
66use crate::catalog::{
67    BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name,
68};
69
70pub struct InitializeStateResult {
71    /// An initialized [`CatalogState`].
72    pub state: CatalogState,
73    /// A set of new shards that may need to be initialized (only used by 0dt migration).
74    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
75    /// A set of new builtin items.
76    pub new_builtin_collections: BTreeSet<GlobalId>,
77    /// A list of builtin table updates corresponding to the initialized state.
78    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
79    /// The version of the catalog that existed before initializing the catalog.
80    pub last_seen_version: String,
81    /// A handle to the expression cache if it's enabled.
82    pub expr_cache_handle: Option<ExpressionCacheHandle>,
83    /// The global expressions that were cached in `expr_cache_handle`.
84    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
85    /// The local expressions that were NOT cached in `expr_cache_handle`.
86    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
87}
88
89pub struct OpenCatalogResult {
90    /// An opened [`Catalog`].
91    pub catalog: Catalog,
92    /// A set of new shards that may need to be initialized.
93    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
94    /// A set of new builtin items.
95    pub new_builtin_collections: BTreeSet<GlobalId>,
96    /// A list of builtin table updates corresponding to the initialized state.
97    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
98    /// The global expressions that were cached in the expression cache.
99    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
100    /// The local expressions that were NOT cached in the expression cache.
101    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
102}
103
104impl Catalog {
105    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
106    /// external to a [mz_catalog::durable::DurableCatalogState]
107    /// (for example: no [mz_secrets::SecretsReader]).
108    pub async fn initialize_state<'a>(
109        config: StateConfig,
110        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
111    ) -> Result<InitializeStateResult, AdapterError> {
112        for builtin_role in BUILTIN_ROLES {
113            assert!(
114                is_reserved_name(builtin_role.name),
115                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
116                BUILTIN_PREFIXES.join(", ")
117            );
118        }
119        for builtin_cluster in BUILTIN_CLUSTERS {
120            assert!(
121                is_reserved_name(builtin_cluster.name),
122                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
123                BUILTIN_PREFIXES.join(", ")
124            );
125        }
126
127        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
128        if config.all_features {
129            system_configuration.enable_all_feature_flags_by_default();
130        }
131
132        let mut state = CatalogState {
133            database_by_name: BTreeMap::new(),
134            database_by_id: BTreeMap::new(),
135            entry_by_id: BTreeMap::new(),
136            entry_by_global_id: BTreeMap::new(),
137            ambient_schemas_by_name: BTreeMap::new(),
138            ambient_schemas_by_id: BTreeMap::new(),
139            clusters_by_name: BTreeMap::new(),
140            clusters_by_id: BTreeMap::new(),
141            roles_by_name: BTreeMap::new(),
142            roles_by_id: BTreeMap::new(),
143            network_policies_by_id: BTreeMap::new(),
144            role_auth_by_id: BTreeMap::new(),
145            network_policies_by_name: BTreeMap::new(),
146            system_configuration,
147            default_privileges: DefaultPrivileges::default(),
148            system_privileges: PrivilegeMap::default(),
149            comments: CommentsMap::default(),
150            source_references: BTreeMap::new(),
151            storage_metadata: Default::default(),
152            temporary_schemas: BTreeMap::new(),
153            mock_authentication_nonce: Default::default(),
154            config: mz_sql::catalog::CatalogConfig {
155                start_time: to_datetime((config.now)()),
156                start_instant: Instant::now(),
157                nonce: rand::random(),
158                environment_id: config.environment_id,
159                session_id: Uuid::new_v4(),
160                build_info: config.build_info,
161                timestamp_interval: Duration::from_secs(1),
162                now: config.now.clone(),
163                connection_context: config.connection_context,
164                builtins_cfg: BuiltinsConfig {
165                    // This will fall back to the default in code (false) if we timeout on the
166                    // initial LD sync. We're just using this to get some additional testing in on
167                    // CTs so a false negative is fine, we're only worried about false positives.
168                    include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
169                        &config.system_parameter_defaults,
170                        config.remote_system_parameters.as_ref(),
171                        &ENABLE_CONTINUAL_TASK_BUILTINS,
172                    ),
173                },
174                helm_chart_version: config.helm_chart_version,
175            },
176            cluster_replica_sizes: config.cluster_replica_sizes,
177            availability_zones: config.availability_zones,
178            egress_addresses: config.egress_addresses,
179            aws_principal_context: config.aws_principal_context,
180            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
181            http_host_name: config.http_host_name,
182            license_key: config.license_key,
183        };
184
185        let deploy_generation = storage.get_deployment_generation().await?;
186
187        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
188        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
189        let mut txn = storage.transaction().await?;
190
191        // Migrate/update durable data before we start loading the in-memory catalog.
192        let new_builtin_collections = {
193            migrate::durable_migrate(
194                &mut txn,
195                state.config.environment_id.organization_id(),
196                config.boot_ts,
197            )?;
198            // Overwrite and persist selected parameter values in `remote_system_parameters` that
199            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
200            if let Some(remote_system_parameters) = config.remote_system_parameters {
201                for (name, value) in remote_system_parameters {
202                    txn.upsert_system_config(&name, value)?;
203                }
204                txn.set_system_config_synced_once()?;
205            }
206            // Add any new builtin objects and remove old ones.
207            let new_builtin_collections =
208                add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
209            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
210                system_cluster: config.builtin_system_cluster_config,
211                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
212                probe_cluster: config.builtin_probe_cluster_config,
213                support_cluster: config.builtin_support_cluster_config,
214                analytics_cluster: config.builtin_analytics_cluster_config,
215            };
216            add_new_remove_old_builtin_clusters_migration(
217                &mut txn,
218                &builtin_bootstrap_cluster_config_map,
219            )?;
220            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
221            add_new_remove_old_builtin_cluster_replicas_migration(
222                &mut txn,
223                &builtin_bootstrap_cluster_config_map,
224            )?;
225            add_new_remove_old_builtin_roles_migration(&mut txn)?;
226            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
227            remove_pending_cluster_replicas_migration(&mut txn)?;
228
229            new_builtin_collections
230        };
231
232        let op_updates = txn.get_and_commit_op_updates();
233        updates.extend(op_updates);
234
235        let mut builtin_table_updates = Vec::new();
236
237        // Seed the in-memory catalog with values that don't come from the durable catalog.
238        {
239            // Set defaults from configuration passed in the provided `system_parameter_defaults`
240            // map.
241            for (name, value) in config.system_parameter_defaults {
242                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
243                    Ok(_) => (),
244                    Err(Error {
245                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
246                    }) => {
247                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
248                    }
249                    Err(e) => return Err(e.into()),
250                };
251            }
252            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
253        }
254
255        // Make life easier by consolidating all updates, so that we end up with only positive
256        // diffs.
257        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
258        differential_dataflow::consolidation::consolidate_updates(&mut updates);
259        soft_assert_no_log!(
260            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
261            "consolidated updates should be positive during startup: {updates:?}"
262        );
263
264        let mut pre_item_updates = Vec::new();
265        let mut system_item_updates = Vec::new();
266        let mut item_updates = Vec::new();
267        let mut post_item_updates = Vec::new();
268        let mut audit_log_updates = Vec::new();
269        for (kind, ts, diff) in updates {
270            match kind {
271                BootstrapStateUpdateKind::Role(_)
272                | BootstrapStateUpdateKind::RoleAuth(_)
273                | BootstrapStateUpdateKind::Database(_)
274                | BootstrapStateUpdateKind::Schema(_)
275                | BootstrapStateUpdateKind::DefaultPrivilege(_)
276                | BootstrapStateUpdateKind::SystemPrivilege(_)
277                | BootstrapStateUpdateKind::SystemConfiguration(_)
278                | BootstrapStateUpdateKind::Cluster(_)
279                | BootstrapStateUpdateKind::NetworkPolicy(_)
280                | BootstrapStateUpdateKind::ClusterReplica(_) => {
281                    pre_item_updates.push(StateUpdate {
282                        kind: kind.into(),
283                        ts,
284                        diff: diff.try_into().expect("valid diff"),
285                    })
286                }
287                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
288                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
289                    system_item_updates.push(StateUpdate {
290                        kind: kind.into(),
291                        ts,
292                        diff: diff.try_into().expect("valid diff"),
293                    })
294                }
295                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
296                    kind: kind.into(),
297                    ts,
298                    diff: diff.try_into().expect("valid diff"),
299                }),
300                BootstrapStateUpdateKind::Comment(_)
301                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
302                | BootstrapStateUpdateKind::SourceReferences(_)
303                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
304                    post_item_updates.push((kind, ts, diff));
305                }
306                BootstrapStateUpdateKind::AuditLog(_) => {
307                    audit_log_updates.push(StateUpdate {
308                        kind: kind.into(),
309                        ts,
310                        diff: diff.try_into().expect("valid diff"),
311                    });
312                }
313            }
314        }
315
316        let builtin_table_update = state
317            .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
318            .await;
319        builtin_table_updates.extend(builtin_table_update);
320
321        // Ensure mz_system has a password if configured to have one.
322        // It's important we do this after the `pre_item_updates` so that
323        // the mz_system role exists in the catalog.
324        {
325            if let Some(password) = config.external_login_password_mz_system {
326                let role_auth = RoleAuth {
327                    role_id: MZ_SYSTEM_ROLE_ID,
328                    // builtin roles should always use a secure scram iteration
329                    // <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
330                    password_hash: Some(
331                        scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
332                            .map_err(|_| {
333                            AdapterError::Internal("Failed to hash mz_system password.".to_owned())
334                        })?,
335                    ),
336                    updated_at: SYSTEM_TIME(),
337                };
338                state
339                    .role_auth_by_id
340                    .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
341                let builtin_table_update = state.generate_builtin_table_update(
342                    mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
343                    mz_catalog::memory::objects::StateDiff::Addition,
344                );
345                builtin_table_updates.extend(builtin_table_update);
346            }
347        }
348
349        let expr_cache_start = Instant::now();
350        info!("startup: coordinator init: catalog open: expr cache open beginning");
351        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
352        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
353        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
354        let expr_cache_enabled = config
355            .enable_expression_cache_override
356            .unwrap_or(enable_expr_cache_dyncfg);
357        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
358            info!(
359                ?config.enable_expression_cache_override,
360                ?enable_expr_cache_dyncfg,
361                "using expression cache for startup"
362            );
363            let current_ids = txn
364                .get_items()
365                .flat_map(|item| {
366                    let gid = item.global_id.clone();
367                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
368                    std::iter::once(gid).chain(gids.into_iter())
369                })
370                .chain(
371                    txn.get_system_object_mappings()
372                        .map(|som| som.unique_identifier.global_id),
373                )
374                .collect();
375            let dyncfgs = config.persist_client.dyncfgs().clone();
376            let build_version = if config.build_info.is_dev() {
377                // A single dev version can be used for many different builds, so we need to use
378                // the build version that is also enriched with build metadata.
379                config
380                    .build_info
381                    .semver_version_build()
382                    .expect("build ID is not available on your platform!")
383            } else {
384                config.build_info.semver_version()
385            };
386            let expr_cache_config = ExpressionCacheConfig {
387                build_version,
388                shard_id: txn
389                    .get_expression_cache_shard()
390                    .expect("expression cache shard should exist for opened catalogs"),
391                persist: config.persist_client,
392                current_ids,
393                remove_prior_versions: !config.read_only,
394                compact_shard: config.read_only,
395                dyncfgs,
396            };
397            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
398                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
399            (
400                Some(expr_cache_handle),
401                cached_local_exprs,
402                cached_global_exprs,
403            )
404        } else {
405            (None, BTreeMap::new(), BTreeMap::new())
406        };
407        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
408        info!(
409            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
410            expr_cache_start.elapsed()
411        );
412
413        let builtin_table_update = state
414            .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
415            .await;
416        builtin_table_updates.extend(builtin_table_update);
417
418        let last_seen_version =
419            get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
420
421        let mz_authentication_mock_nonce =
422            txn.get_authentication_mock_nonce().ok_or_else(|| {
423                Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
424            })?;
425
426        state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
427
428        // Migrate item ASTs.
429        let builtin_table_update = if !config.skip_migrations {
430            let migrate_result = migrate::migrate(
431                &mut state,
432                &mut txn,
433                &mut local_expr_cache,
434                item_updates,
435                config.now,
436                config.boot_ts,
437            )
438            .await
439            .map_err(|e| {
440                Error::new(ErrorKind::FailedCatalogMigration {
441                    last_seen_version: last_seen_version.clone(),
442                    this_version: config.build_info.version,
443                    cause: e.to_string(),
444                })
445            })?;
446            if !migrate_result.post_item_updates.is_empty() {
447                // Include any post-item-updates generated by migrations, and then consolidate
448                // them to ensure diffs are all positive.
449                post_item_updates.extend(migrate_result.post_item_updates);
450                // Push everything to the same timestamp so it consolidates cleanly.
451                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
452                    for (_, ts, _) in &mut post_item_updates {
453                        *ts = max_ts;
454                    }
455                }
456                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
457            }
458
459            migrate_result.builtin_table_updates
460        } else {
461            state
462                .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
463                .await
464        };
465        builtin_table_updates.extend(builtin_table_update);
466
467        let post_item_updates = post_item_updates
468            .into_iter()
469            .map(|(kind, ts, diff)| StateUpdate {
470                kind: kind.into(),
471                ts,
472                diff: diff.try_into().expect("valid diff"),
473            })
474            .collect();
475        let builtin_table_update = state
476            .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
477            .await;
478        builtin_table_updates.extend(builtin_table_update);
479
480        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
481        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
482        // updates.
483        for audit_log_update in audit_log_updates {
484            builtin_table_updates.extend(
485                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
486            );
487        }
488
489        // Migrate builtin items.
490        let schema_migration_result = builtin_schema_migration::run(
491            config.build_info,
492            deploy_generation,
493            &mut txn,
494            config.builtin_item_migration_config,
495        )
496        .await?;
497
498        let state_updates = txn.get_and_commit_op_updates();
499        let table_updates = state
500            .apply_updates_for_bootstrap(state_updates, &mut local_expr_cache)
501            .await;
502        builtin_table_updates.extend(table_updates);
503        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
504
505        // Bump the migration version immediately before committing.
506        set_migration_version(&mut txn, config.build_info.semver_version())?;
507
508        txn.commit(config.boot_ts).await?;
509
510        // Now that the migration is durable, run any requested deferred cleanup.
511        schema_migration_result.cleanup_action.await;
512
513        Ok(InitializeStateResult {
514            state,
515            migrated_storage_collections_0dt: schema_migration_result.replaced_items,
516            new_builtin_collections: new_builtin_collections.into_iter().collect(),
517            builtin_table_updates,
518            last_seen_version,
519            expr_cache_handle,
520            cached_global_exprs,
521            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
522        })
523    }
524
525    /// Opens or creates a catalog that stores data at `path`.
526    ///
527    /// Returns the catalog, metadata about builtin objects that have changed
528    /// schemas since last restart, a list of updates to builtin tables that
529    /// describe the initial state of the catalog, and the version of the
530    /// catalog before any migrations were performed.
531    ///
532    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
533    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
534    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
535    #[instrument(name = "catalog::open")]
536    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
537        async move {
538            let mut storage = config.storage;
539
540            let InitializeStateResult {
541                state,
542                migrated_storage_collections_0dt,
543                new_builtin_collections,
544                mut builtin_table_updates,
545                last_seen_version: _,
546                expr_cache_handle,
547                cached_global_exprs,
548                uncached_local_exprs,
549            } =
550                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
551                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
552                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
553                Self::initialize_state(config.state, &mut storage)
554                    .instrument(tracing::info_span!("catalog::initialize_state"))
555                    .boxed()
556                    .await?;
557
558            let catalog = Catalog {
559                state,
560                plans: CatalogPlans::default(),
561                expr_cache_handle,
562                transient_revision: 1,
563                storage: Arc::new(tokio::sync::Mutex::new(storage)),
564            };
565
566            // Operators aren't stored in the catalog, but we would like them in
567            // introspection views.
568            for (op, func) in OP_IMPLS.iter() {
569                match func {
570                    mz_sql::func::Func::Scalar(impls) => {
571                        for imp in impls {
572                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
573                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
574                            ));
575                        }
576                    }
577                    _ => unreachable!("all operators must be scalar functions"),
578                }
579            }
580
581            for ip in &catalog.state.egress_addresses {
582                builtin_table_updates.push(
583                    catalog
584                        .state
585                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
586                );
587            }
588
589            if !catalog.state.license_key.id.is_empty() {
590                builtin_table_updates.push(
591                    catalog.state.resolve_builtin_table_update(
592                        catalog
593                            .state
594                            .pack_license_key_update(&catalog.state.license_key)?,
595                    ),
596                );
597            }
598
599            catalog.storage().await.mark_bootstrap_complete().await;
600
601            Ok(OpenCatalogResult {
602                catalog,
603                migrated_storage_collections_0dt,
604                new_builtin_collections,
605                builtin_table_updates,
606                cached_global_exprs,
607                uncached_local_exprs,
608            })
609        }
610        .instrument(tracing::info_span!("catalog::open"))
611        .boxed()
612    }
613
614    /// Initializes STORAGE to understand all shards that `self` expects to
615    /// exist.
616    ///
617    /// Note that this must be done before creating/rendering collections
618    /// because the storage controller might not be aware of new system
619    /// collections created between versions.
620    async fn initialize_storage_state(
621        &mut self,
622        storage_collections: &Arc<
623            dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
624        >,
625    ) -> Result<(), mz_catalog::durable::CatalogError> {
626        let collections = self
627            .entries()
628            .filter(|entry| entry.item().is_storage_collection())
629            .flat_map(|entry| entry.global_ids())
630            .collect();
631
632        // Clone the state so that any errors that occur do not leak any
633        // transformations on error.
634        let mut state = self.state.clone();
635
636        let mut storage = self.storage().await;
637        let mut txn = storage.transaction().await?;
638
639        storage_collections
640            .initialize_state(&mut txn, collections)
641            .await
642            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
643
644        let updates = txn.get_and_commit_op_updates();
645        let builtin_updates = state.apply_updates(updates)?;
646        assert!(builtin_updates.is_empty());
647        let commit_ts = txn.upper();
648        txn.commit(commit_ts).await?;
649        drop(storage);
650
651        // Save updated state.
652        self.state = state;
653        Ok(())
654    }
655
656    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
657    /// so make it available and initialize the controller.
658    pub async fn initialize_controller(
659        &mut self,
660        config: mz_controller::ControllerConfig,
661        envd_epoch: core::num::NonZeroI64,
662        read_only: bool,
663    ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
664    {
665        let controller_start = Instant::now();
666        info!("startup: controller init: beginning");
667
668        let controller = {
669            let mut storage = self.storage().await;
670            let mut tx = storage.transaction().await?;
671            mz_controller::prepare_initialization(&mut tx)
672                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
673            let updates = tx.get_and_commit_op_updates();
674            assert!(
675                updates.is_empty(),
676                "initializing controller should not produce updates: {updates:?}"
677            );
678            let commit_ts = tx.upper();
679            tx.commit(commit_ts).await?;
680
681            let read_only_tx = storage.transaction().await?;
682
683            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
684        };
685
686        self.initialize_storage_state(&controller.storage_collections)
687            .await?;
688
689        info!(
690            "startup: controller init: complete in {:?}",
691            controller_start.elapsed()
692        );
693
694        Ok(controller)
695    }
696
697    /// Politely releases all external resources that can only be released in an async context.
698    pub async fn expire(self) {
699        // If no one else holds a reference to storage, then clean up the storage resources.
700        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
701        if let Some(storage) = Arc::into_inner(self.storage) {
702            let storage = storage.into_inner();
703            storage.expire().await;
704        }
705    }
706}
707
708impl CatalogState {
709    /// Set the default value for `name`, which is the value it will be reset to.
710    fn set_system_configuration_default(
711        &mut self,
712        name: &str,
713        value: VarInput,
714    ) -> Result<(), Error> {
715        Ok(self.system_configuration.set_default(name, value)?)
716    }
717}
718
719/// Updates the catalog with new and removed builtin items.
720///
721/// Returns the list of new builtin [`GlobalId`]s.
722fn add_new_remove_old_builtin_items_migration(
723    builtins_cfg: &BuiltinsConfig,
724    txn: &mut mz_catalog::durable::Transaction<'_>,
725) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
726    let mut new_builtin_mappings = Vec::new();
727    // Used to validate unique descriptions.
728    let mut builtin_descs = HashSet::new();
729
730    // We compare the builtin items that are compiled into the binary with the builtin items that
731    // are persisted in the catalog to discover new and deleted builtin items.
732    let mut builtins = Vec::new();
733    for builtin in BUILTINS::iter(builtins_cfg) {
734        let desc = SystemObjectDescription {
735            schema_name: builtin.schema().to_string(),
736            object_type: builtin.catalog_item_type(),
737            object_name: builtin.name().to_string(),
738        };
739        // Validate that the description is unique.
740        if !builtin_descs.insert(desc.clone()) {
741            panic!(
742                "duplicate builtin description: {:?}, {:?}",
743                SystemObjectDescription {
744                    schema_name: builtin.schema().to_string(),
745                    object_type: builtin.catalog_item_type(),
746                    object_name: builtin.name().to_string(),
747                },
748                builtin
749            );
750        }
751        builtins.push((desc, builtin));
752    }
753
754    let mut system_object_mappings: BTreeMap<_, _> = txn
755        .get_system_object_mappings()
756        .map(|system_object_mapping| {
757            (
758                system_object_mapping.description.clone(),
759                system_object_mapping,
760            )
761        })
762        .collect();
763
764    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
765        builtins.into_iter().partition_map(|(desc, builtin)| {
766            let fingerprint = match builtin.runtime_alterable() {
767                false => builtin.fingerprint(),
768                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
769            };
770            match system_object_mappings.remove(&desc) {
771                Some(system_object_mapping) => {
772                    Either::Left((builtin, system_object_mapping, fingerprint))
773                }
774                None => Either::Right((builtin, fingerprint)),
775            }
776        });
777    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
778    let new_builtins: Vec<_> = new_builtins
779        .into_iter()
780        .zip_eq(new_builtin_ids.clone())
781        .collect();
782
783    // Add new builtin items to catalog.
784    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
785        new_builtin_mappings.push(SystemObjectMapping {
786            description: SystemObjectDescription {
787                schema_name: builtin.schema().to_string(),
788                object_type: builtin.catalog_item_type(),
789                object_name: builtin.name().to_string(),
790            },
791            unique_identifier: SystemObjectUniqueIdentifier {
792                catalog_id,
793                global_id,
794                fingerprint,
795            },
796        });
797
798        // Runtime-alterable system objects are durably recorded to the
799        // usual items collection, so that they can be later altered at
800        // runtime by their owner (i.e., outside of the usual builtin
801        // migration framework that requires changes to the binary
802        // itself).
803        let handled_runtime_alterable = match builtin {
804            Builtin::Connection(c) if c.runtime_alterable => {
805                let mut acl_items = vec![rbac::owner_privilege(
806                    mz_sql::catalog::ObjectType::Connection,
807                    c.owner_id.clone(),
808                )];
809                acl_items.extend_from_slice(c.access);
810                // Builtin Connections cannot be versioned.
811                let versions = BTreeMap::new();
812
813                txn.insert_item(
814                    catalog_id,
815                    c.oid,
816                    global_id,
817                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
818                    c.name,
819                    c.sql.into(),
820                    *c.owner_id,
821                    acl_items,
822                    versions,
823                )?;
824                true
825            }
826            _ => false,
827        };
828        assert_eq!(
829            builtin.runtime_alterable(),
830            handled_runtime_alterable,
831            "runtime alterable object was not handled by migration",
832        );
833    }
834    txn.set_system_object_mappings(new_builtin_mappings)?;
835
836    // Update comments of all builtin objects
837    let builtins_with_catalog_ids = existing_builtins
838        .iter()
839        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
840        .chain(
841            new_builtins
842                .into_iter()
843                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
844        );
845
846    for (builtin, id) in builtins_with_catalog_ids {
847        let (comment_id, desc, comments) = match builtin {
848            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
849            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
850            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
851            Builtin::Log(_)
852            | Builtin::Type(_)
853            | Builtin::Func(_)
854            | Builtin::ContinualTask(_)
855            | Builtin::Index(_)
856            | Builtin::Connection(_) => continue,
857        };
858        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
859
860        let mut comments = comments.clone();
861        for (col_idx, name) in desc.iter_names().enumerate() {
862            if let Some(comment) = comments.remove(name.as_str()) {
863                // Comment column indices are 1 based
864                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
865            }
866        }
867        assert!(
868            comments.is_empty(),
869            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
870        );
871    }
872
873    // Anything left in `system_object_mappings` must have been deleted and should be removed from
874    // the catalog.
875    let mut deleted_system_objects = BTreeSet::new();
876    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
877    let mut deleted_comments = BTreeSet::new();
878    for (desc, mapping) in system_object_mappings {
879        deleted_system_objects.insert(mapping.description);
880        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
881            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
882        }
883
884        let id = mapping.unique_identifier.catalog_id;
885        let comment_id = match desc.object_type {
886            CatalogItemType::Table => CommentObjectId::Table(id),
887            CatalogItemType::Source => CommentObjectId::Source(id),
888            CatalogItemType::View => CommentObjectId::View(id),
889            CatalogItemType::Sink
890            | CatalogItemType::MaterializedView
891            | CatalogItemType::Index
892            | CatalogItemType::Type
893            | CatalogItemType::Func
894            | CatalogItemType::Secret
895            | CatalogItemType::Connection
896            | CatalogItemType::ContinualTask => continue,
897        };
898        deleted_comments.insert(comment_id);
899    }
900    // If you are 100% positive that it is safe to delete a system object outside any of the
901    // unstable schemas, then add it to this set. Make sure that no prod environments are
902    // using this object and that the upgrade checker does not show any issues.
903    //
904    // Objects can be removed from this set after one release.
905    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
906    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
907    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
908    // handle that scenario correctly.
909    assert!(
910        deleted_system_objects
911            .iter()
912            // It's okay if Indexes change because they're inherently ephemeral.
913            .filter(|object| object.object_type != CatalogItemType::Index)
914            .all(
915                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
916                    || delete_exceptions.contains(deleted_object)
917            ),
918        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
919        deleted_system_objects
920    );
921    txn.drop_comments(&deleted_comments)?;
922    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
923    txn.remove_system_object_mappings(deleted_system_objects)?;
924
925    // Filter down to just the GlobalIds which are used to track the underlying collections.
926    let new_builtin_collections = new_builtin_ids
927        .into_iter()
928        .map(|(_catalog_id, global_id)| global_id)
929        .collect();
930
931    Ok(new_builtin_collections)
932}
933
934fn add_new_remove_old_builtin_clusters_migration(
935    txn: &mut mz_catalog::durable::Transaction<'_>,
936    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
937) -> Result<(), mz_catalog::durable::CatalogError> {
938    let mut durable_clusters: BTreeMap<_, _> = txn
939        .get_clusters()
940        .filter(|cluster| cluster.id.is_system())
941        .map(|cluster| (cluster.name.to_string(), cluster))
942        .collect();
943
944    // Add new clusters.
945    for builtin_cluster in BUILTIN_CLUSTERS {
946        if durable_clusters.remove(builtin_cluster.name).is_none() {
947            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
948
949            txn.insert_system_cluster(
950                builtin_cluster.name,
951                vec![],
952                builtin_cluster.privileges.to_vec(),
953                builtin_cluster.owner_id.to_owned(),
954                mz_catalog::durable::ClusterConfig {
955                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
956                        size: cluster_config.size,
957                        availability_zones: vec![],
958                        replication_factor: cluster_config.replication_factor,
959                        logging: default_logging_config(),
960                        optimizer_feature_overrides: Default::default(),
961                        schedule: Default::default(),
962                    }),
963                    workload_class: None,
964                },
965                &HashSet::new(),
966            )?;
967        }
968    }
969
970    // Remove old clusters.
971    let old_clusters = durable_clusters
972        .values()
973        .map(|cluster| cluster.id)
974        .collect();
975    txn.remove_clusters(&old_clusters)?;
976
977    Ok(())
978}
979
980fn add_new_remove_old_builtin_introspection_source_migration(
981    txn: &mut mz_catalog::durable::Transaction<'_>,
982) -> Result<(), AdapterError> {
983    let mut new_indexes = Vec::new();
984    let mut removed_indexes = BTreeSet::new();
985    for cluster in txn.get_clusters() {
986        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
987
988        let mut new_logs = Vec::new();
989
990        for log in BUILTINS::logs() {
991            if introspection_source_index_ids.remove(log.name).is_none() {
992                new_logs.push(log);
993            }
994        }
995
996        for log in new_logs {
997            let (item_id, gid) =
998                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
999            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1000        }
1001
1002        // Anything left in `introspection_source_index_ids` must have been deleted and should be
1003        // removed from the catalog.
1004        removed_indexes.extend(
1005            introspection_source_index_ids
1006                .into_keys()
1007                .map(|name| (cluster.id, name.to_string())),
1008        );
1009    }
1010    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1011    txn.remove_introspection_source_indexes(removed_indexes)?;
1012    Ok(())
1013}
1014
1015fn add_new_remove_old_builtin_roles_migration(
1016    txn: &mut mz_catalog::durable::Transaction<'_>,
1017) -> Result<(), mz_catalog::durable::CatalogError> {
1018    let mut durable_roles: BTreeMap<_, _> = txn
1019        .get_roles()
1020        .filter(|role| role.id.is_system() || role.id.is_predefined())
1021        .map(|role| (role.name.to_string(), role))
1022        .collect();
1023
1024    // Add new roles.
1025    for builtin_role in BUILTIN_ROLES {
1026        if durable_roles.remove(builtin_role.name).is_none() {
1027            txn.insert_builtin_role(
1028                builtin_role.id,
1029                builtin_role.name.to_string(),
1030                builtin_role.attributes.clone(),
1031                RoleMembership::new(),
1032                RoleVars::default(),
1033                builtin_role.oid,
1034            )?;
1035        }
1036    }
1037
1038    // Remove old roles.
1039    let old_roles = durable_roles.values().map(|role| role.id).collect();
1040    txn.remove_roles(&old_roles)?;
1041
1042    Ok(())
1043}
1044
1045fn add_new_remove_old_builtin_cluster_replicas_migration(
1046    txn: &mut Transaction<'_>,
1047    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1048) -> Result<(), AdapterError> {
1049    let cluster_lookup: BTreeMap<_, _> = txn
1050        .get_clusters()
1051        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1052        .collect();
1053
1054    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1055        .get_cluster_replicas()
1056        .filter(|replica| replica.replica_id.is_system())
1057        .fold(BTreeMap::new(), |mut acc, replica| {
1058            acc.entry(replica.cluster_id)
1059                .or_insert_with(BTreeMap::new)
1060                .insert(replica.name.to_string(), replica);
1061            acc
1062        });
1063
1064    // Add new replicas.
1065    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1066        let cluster = cluster_lookup
1067            .get(builtin_replica.cluster_name)
1068            .expect("builtin cluster replica references non-existent cluster");
1069        // `empty_map` is a hack to simplify the if statement below.
1070        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1071        let replica_names = durable_replicas
1072            .get_mut(&cluster.id)
1073            .unwrap_or(&mut empty_map);
1074
1075        let builtin_cluster_bootstrap_config =
1076            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1077        if replica_names.remove(builtin_replica.name).is_none()
1078            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1079            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1080            // factor is configurable on bootstrap.
1081            && builtin_cluster_bootstrap_config.replication_factor > 0
1082        {
1083            let replica_size = match cluster.config.variant {
1084                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1085                ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size,
1086            };
1087
1088            let config = builtin_cluster_replica_config(replica_size);
1089            txn.insert_cluster_replica(
1090                cluster.id,
1091                builtin_replica.name,
1092                config,
1093                MZ_SYSTEM_ROLE_ID,
1094            )?;
1095        }
1096    }
1097
1098    // Remove old replicas.
1099    let old_replicas = durable_replicas
1100        .values()
1101        .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1102        .collect();
1103    txn.remove_cluster_replicas(&old_replicas)?;
1104
1105    Ok(())
1106}
1107
1108/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1109/// the 'cluster' parameter.
1110///
1111/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1112/// input is no longer valid. For example if we remove a configuration parameter or change the
1113/// accepted set of values.
1114fn remove_invalid_config_param_role_defaults_migration(
1115    txn: &mut Transaction<'_>,
1116) -> Result<(), AdapterError> {
1117    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1118
1119    let roles_to_migrate: BTreeMap<_, _> = txn
1120        .get_roles()
1121        .filter_map(|mut role| {
1122            // Create an empty SessionVars just so we can check if a var is valid.
1123            //
1124            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1125            // session variables.
1126            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1127
1128            // Iterate over all of the variable defaults for this role.
1129            let mut invalid_roles_vars = BTreeMap::new();
1130            for (name, value) in &role.vars.map {
1131                // If one does not exist or its value is invalid, then mark it for removal.
1132                let Ok(session_var) = session_vars.inspect(name) else {
1133                    invalid_roles_vars.insert(name.clone(), value.clone());
1134                    continue;
1135                };
1136                if session_var.check(value.borrow()).is_err() {
1137                    invalid_roles_vars.insert(name.clone(), value.clone());
1138                }
1139            }
1140
1141            // If the role has no invalid values, nothing to do!
1142            if invalid_roles_vars.is_empty() {
1143                return None;
1144            }
1145
1146            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1147
1148            // Otherwise, remove the variables from the role and return it to be updated.
1149            for (name, _value) in invalid_roles_vars {
1150                role.vars.map.remove(&name);
1151            }
1152            Some(role)
1153        })
1154        .map(|role| (role.id, role))
1155        .collect();
1156
1157    txn.update_roles_without_auth(roles_to_migrate)?;
1158
1159    Ok(())
1160}
1161
1162/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1163/// are marked as pending and should be cleaned up on catalog opsn.
1164fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1165    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1166        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1167            replica.config.location
1168        {
1169            tx.remove_cluster_replica(replica.replica_id)?;
1170        }
1171    }
1172    Ok(())
1173}
1174
1175pub(crate) fn builtin_cluster_replica_config(
1176    replica_size: String,
1177) -> mz_catalog::durable::ReplicaConfig {
1178    mz_catalog::durable::ReplicaConfig {
1179        location: mz_catalog::durable::ReplicaLocation::Managed {
1180            availability_zone: None,
1181            billed_as: None,
1182            pending: false,
1183            internal: false,
1184            size: replica_size,
1185        },
1186        logging: default_logging_config(),
1187    }
1188}
1189
1190fn default_logging_config() -> ReplicaLogging {
1191    ReplicaLogging {
1192        log_logging: false,
1193        interval: Some(Duration::from_secs(1)),
1194    }
1195}
1196
1197#[derive(Debug)]
1198pub struct BuiltinBootstrapClusterConfigMap {
1199    /// Size and replication factor to default system_cluster on bootstrap
1200    pub system_cluster: BootstrapBuiltinClusterConfig,
1201    /// Size and replication factor to default catalog_server_cluster on bootstrap
1202    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1203    /// Size and replication factor to default probe_cluster on bootstrap
1204    pub probe_cluster: BootstrapBuiltinClusterConfig,
1205    /// Size and replication factor to default support_cluster on bootstrap
1206    pub support_cluster: BootstrapBuiltinClusterConfig,
1207    /// Size to default analytics_cluster on bootstrap
1208    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1209}
1210
1211impl BuiltinBootstrapClusterConfigMap {
1212    /// Gets the size of the builtin cluster based on the provided name
1213    fn get_config(
1214        &self,
1215        cluster_name: &str,
1216    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1217        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1218            &self.system_cluster
1219        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1220            &self.catalog_server_cluster
1221        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1222            &self.probe_cluster
1223        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1224            &self.support_cluster
1225        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1226            &self.analytics_cluster
1227        } else {
1228            return Err(mz_catalog::durable::CatalogError::Catalog(
1229                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1230            ));
1231        };
1232        Ok(cluster_config.clone())
1233    }
1234}
1235
1236/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1237///
1238///   - Convert each update into a type that implements [`std::cmp::Ord`].
1239///   - Update the timestamp of each update to the same value.
1240///   - Convert the diff of each update to a type that implements
1241///     [`differential_dataflow::difference::Semigroup`].
1242///
1243/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1244/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1245/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1246/// and are only created after bootstrap is complete. So we forcibly convert each
1247/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1248/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1249/// temporary item variant and does implement [`std::cmp::Ord`].
1250///
1251/// WARNING: Do not call outside of startup.
1252pub(crate) fn into_consolidatable_updates_startup(
1253    updates: Vec<StateUpdate>,
1254    ts: Timestamp,
1255) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1256    updates
1257        .into_iter()
1258        .map(|StateUpdate { kind, ts: _, diff }| {
1259            let kind: BootstrapStateUpdateKind = kind
1260                .try_into()
1261                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1262            (kind, ts, Diff::from(diff))
1263        })
1264        .collect()
1265}
1266
1267fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1268    defaults: &BTreeMap<String, String>,
1269    remote: Option<&BTreeMap<String, String>>,
1270    cfg: &mz_dyncfg::Config<T>,
1271) -> T::ConfigType {
1272    let mut val = T::into_config_type(cfg.default().clone());
1273    let get_fn = |map: &BTreeMap<String, String>| {
1274        let val = map.get(cfg.name())?;
1275        match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1276            Ok(x) => Some(x),
1277            Err(err) => {
1278                tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1279                None
1280            }
1281        }
1282    };
1283    if let Some(x) = get_fn(defaults) {
1284        val = x;
1285    }
1286    if let Some(x) = remote.and_then(get_fn) {
1287        val = x;
1288    }
1289    val
1290}