mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
23use mz_auth::hash::scram256_hash;
24use mz_catalog::SYSTEM_CONN_ID;
25use mz_catalog::builtin::{
26    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
27    Fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
28};
29use mz_catalog::config::StateConfig;
30use mz_catalog::durable::objects::{
31    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
32};
33use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
34use mz_catalog::expr_cache::{
35    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
36};
37use mz_catalog::memory::error::{Error, ErrorKind};
38use mz_catalog::memory::objects::{
39    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
40};
41use mz_controller::clusters::ReplicaLogging;
42use mz_controller_types::ClusterId;
43use mz_ore::cast::usize_to_u64;
44use mz_ore::collections::HashSet;
45use mz_ore::now::{SYSTEM_TIME, to_datetime};
46use mz_ore::{instrument, soft_assert_no_log};
47use mz_repr::adt::mz_acl_item::PrivilegeMap;
48use mz_repr::namespaces::is_unstable_schema;
49use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
50use mz_sql::catalog::{
51    BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
52};
53use mz_sql::func::OP_IMPLS;
54use mz_sql::names::CommentObjectId;
55use mz_sql::rbac;
56use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
57use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
58use mz_storage_client::storage_collections::StorageCollections;
59use tracing::{Instrument, info, warn};
60use uuid::Uuid;
61
62// DO NOT add any more imports from `crate` outside of `crate::catalog`.
63use crate::AdapterError;
64use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
65use crate::catalog::state::LocalExpressionCache;
66use crate::catalog::{
67    BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name,
68};
69
70pub struct InitializeStateResult {
71    /// An initialized [`CatalogState`].
72    pub state: CatalogState,
73    /// A set of new shards that may need to be initialized (only used by 0dt migration).
74    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
75    /// A set of new builtin items.
76    pub new_builtin_collections: BTreeSet<GlobalId>,
77    /// A list of builtin table updates corresponding to the initialized state.
78    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
79    /// The version of the catalog that existed before initializing the catalog.
80    pub last_seen_version: String,
81    /// A handle to the expression cache if it's enabled.
82    pub expr_cache_handle: Option<ExpressionCacheHandle>,
83    /// The global expressions that were cached in `expr_cache_handle`.
84    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
85    /// The local expressions that were NOT cached in `expr_cache_handle`.
86    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
87}
88
89pub struct OpenCatalogResult {
90    /// An opened [`Catalog`].
91    pub catalog: Catalog,
92    /// A set of new shards that may need to be initialized.
93    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
94    /// A set of new builtin items.
95    pub new_builtin_collections: BTreeSet<GlobalId>,
96    /// A list of builtin table updates corresponding to the initialized state.
97    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
98    /// The global expressions that were cached in the expression cache.
99    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
100    /// The local expressions that were NOT cached in the expression cache.
101    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
102}
103
104impl Catalog {
105    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
106    /// external to a [mz_catalog::durable::DurableCatalogState]
107    /// (for example: no [mz_secrets::SecretsReader]).
108    pub async fn initialize_state<'a>(
109        config: StateConfig,
110        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
111    ) -> Result<InitializeStateResult, AdapterError> {
112        for builtin_role in BUILTIN_ROLES {
113            assert!(
114                is_reserved_name(builtin_role.name),
115                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
116                BUILTIN_PREFIXES.join(", ")
117            );
118        }
119        for builtin_cluster in BUILTIN_CLUSTERS {
120            assert!(
121                is_reserved_name(builtin_cluster.name),
122                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
123                BUILTIN_PREFIXES.join(", ")
124            );
125        }
126
127        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
128        if config.all_features {
129            system_configuration.enable_all_feature_flags_by_default();
130        }
131
132        let mut state = CatalogState {
133            database_by_name: BTreeMap::new(),
134            database_by_id: BTreeMap::new(),
135            entry_by_id: BTreeMap::new(),
136            entry_by_global_id: BTreeMap::new(),
137            ambient_schemas_by_name: BTreeMap::new(),
138            ambient_schemas_by_id: BTreeMap::new(),
139            clusters_by_name: BTreeMap::new(),
140            clusters_by_id: BTreeMap::new(),
141            roles_by_name: BTreeMap::new(),
142            roles_by_id: BTreeMap::new(),
143            network_policies_by_id: BTreeMap::new(),
144            role_auth_by_id: BTreeMap::new(),
145            network_policies_by_name: BTreeMap::new(),
146            system_configuration,
147            default_privileges: DefaultPrivileges::default(),
148            system_privileges: PrivilegeMap::default(),
149            comments: CommentsMap::default(),
150            source_references: BTreeMap::new(),
151            storage_metadata: Default::default(),
152            temporary_schemas: BTreeMap::new(),
153            mock_authentication_nonce: Default::default(),
154            config: mz_sql::catalog::CatalogConfig {
155                start_time: to_datetime((config.now)()),
156                start_instant: Instant::now(),
157                nonce: rand::random(),
158                environment_id: config.environment_id,
159                session_id: Uuid::new_v4(),
160                build_info: config.build_info,
161                timestamp_interval: Duration::from_secs(1),
162                now: config.now.clone(),
163                connection_context: config.connection_context,
164                builtins_cfg: BuiltinsConfig {
165                    // This will fall back to the default in code (false) if we timeout on the
166                    // initial LD sync. We're just using this to get some additional testing in on
167                    // CTs so a false negative is fine, we're only worried about false positives.
168                    include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
169                        &config.system_parameter_defaults,
170                        config.remote_system_parameters.as_ref(),
171                        &ENABLE_CONTINUAL_TASK_BUILTINS,
172                    ),
173                },
174                helm_chart_version: config.helm_chart_version,
175            },
176            cluster_replica_sizes: config.cluster_replica_sizes,
177            availability_zones: config.availability_zones,
178            egress_addresses: config.egress_addresses,
179            aws_principal_context: config.aws_principal_context,
180            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
181            http_host_name: config.http_host_name,
182            license_key: config.license_key,
183        };
184
185        let deploy_generation = storage.get_deployment_generation().await?;
186
187        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
188        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
189        let mut txn = storage.transaction().await?;
190
191        // Migrate/update durable data before we start loading the in-memory catalog.
192        let new_builtin_collections = {
193            migrate::durable_migrate(
194                &mut txn,
195                state.config.environment_id.organization_id(),
196                config.boot_ts,
197            )?;
198            // Overwrite and persist selected parameter values in `remote_system_parameters` that
199            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
200            if let Some(remote_system_parameters) = config.remote_system_parameters {
201                for (name, value) in remote_system_parameters {
202                    txn.upsert_system_config(&name, value)?;
203                }
204                txn.set_system_config_synced_once()?;
205            }
206            // Add any new builtin objects and remove old ones.
207            let new_builtin_collections =
208                add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
209            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
210                system_cluster: config.builtin_system_cluster_config,
211                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
212                probe_cluster: config.builtin_probe_cluster_config,
213                support_cluster: config.builtin_support_cluster_config,
214                analytics_cluster: config.builtin_analytics_cluster_config,
215            };
216            add_new_remove_old_builtin_clusters_migration(
217                &mut txn,
218                &builtin_bootstrap_cluster_config_map,
219            )?;
220            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
221            add_new_remove_old_builtin_cluster_replicas_migration(
222                &mut txn,
223                &builtin_bootstrap_cluster_config_map,
224            )?;
225            add_new_remove_old_builtin_roles_migration(&mut txn)?;
226            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
227            remove_pending_cluster_replicas_migration(&mut txn)?;
228
229            new_builtin_collections
230        };
231
232        let op_updates = txn.get_and_commit_op_updates();
233        updates.extend(op_updates);
234
235        let mut builtin_table_updates = Vec::new();
236
237        // Seed the in-memory catalog with values that don't come from the durable catalog.
238        {
239            // Set defaults from configuration passed in the provided `system_parameter_defaults`
240            // map.
241            for (name, value) in config.system_parameter_defaults {
242                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
243                    Ok(_) => (),
244                    Err(Error {
245                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
246                    }) => {
247                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
248                    }
249                    Err(e) => return Err(e.into()),
250                };
251            }
252            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
253        }
254
255        // Make life easier by consolidating all updates, so that we end up with only positive
256        // diffs.
257        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
258        differential_dataflow::consolidation::consolidate_updates(&mut updates);
259        soft_assert_no_log!(
260            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
261            "consolidated updates should be positive during startup: {updates:?}"
262        );
263
264        let mut pre_item_updates = Vec::new();
265        let mut system_item_updates = Vec::new();
266        let mut item_updates = Vec::new();
267        let mut post_item_updates = Vec::new();
268        let mut audit_log_updates = Vec::new();
269        for (kind, ts, diff) in updates {
270            match kind {
271                BootstrapStateUpdateKind::Role(_)
272                | BootstrapStateUpdateKind::RoleAuth(_)
273                | BootstrapStateUpdateKind::Database(_)
274                | BootstrapStateUpdateKind::Schema(_)
275                | BootstrapStateUpdateKind::DefaultPrivilege(_)
276                | BootstrapStateUpdateKind::SystemPrivilege(_)
277                | BootstrapStateUpdateKind::SystemConfiguration(_)
278                | BootstrapStateUpdateKind::Cluster(_)
279                | BootstrapStateUpdateKind::NetworkPolicy(_)
280                | BootstrapStateUpdateKind::ClusterReplica(_) => {
281                    pre_item_updates.push(StateUpdate {
282                        kind: kind.into(),
283                        ts,
284                        diff: diff.try_into().expect("valid diff"),
285                    })
286                }
287                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
288                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
289                    system_item_updates.push(StateUpdate {
290                        kind: kind.into(),
291                        ts,
292                        diff: diff.try_into().expect("valid diff"),
293                    })
294                }
295                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
296                    kind: kind.into(),
297                    ts,
298                    diff: diff.try_into().expect("valid diff"),
299                }),
300                BootstrapStateUpdateKind::Comment(_)
301                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
302                | BootstrapStateUpdateKind::SourceReferences(_)
303                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
304                    post_item_updates.push((kind, ts, diff));
305                }
306                BootstrapStateUpdateKind::AuditLog(_) => {
307                    audit_log_updates.push(StateUpdate {
308                        kind: kind.into(),
309                        ts,
310                        diff: diff.try_into().expect("valid diff"),
311                    });
312                }
313            }
314        }
315
316        let (builtin_table_update, _catalog_updates) = state
317            .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
318            .await;
319        builtin_table_updates.extend(builtin_table_update);
320
321        // Ensure mz_system has a password if configured to have one.
322        // It's important we do this after the `pre_item_updates` so that
323        // the mz_system role exists in the catalog.
324        {
325            if let Some(password) = config.external_login_password_mz_system {
326                let role_auth = RoleAuth {
327                    role_id: MZ_SYSTEM_ROLE_ID,
328                    // builtin roles should always use a secure scram iteration
329                    // <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
330                    password_hash: Some(
331                        scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
332                            .map_err(|_| {
333                            AdapterError::Internal("Failed to hash mz_system password.".to_owned())
334                        })?,
335                    ),
336                    updated_at: SYSTEM_TIME(),
337                };
338                state
339                    .role_auth_by_id
340                    .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
341                let builtin_table_update = state.generate_builtin_table_update(
342                    mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
343                    mz_catalog::memory::objects::StateDiff::Addition,
344                );
345                builtin_table_updates.extend(builtin_table_update);
346            }
347        }
348
349        let expr_cache_start = Instant::now();
350        info!("startup: coordinator init: catalog open: expr cache open beginning");
351        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
352        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
353        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
354        let expr_cache_enabled = config
355            .enable_expression_cache_override
356            .unwrap_or(enable_expr_cache_dyncfg);
357        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
358            info!(
359                ?config.enable_expression_cache_override,
360                ?enable_expr_cache_dyncfg,
361                "using expression cache for startup"
362            );
363            let current_ids = txn
364                .get_items()
365                .flat_map(|item| {
366                    let gid = item.global_id.clone();
367                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
368                    std::iter::once(gid).chain(gids.into_iter())
369                })
370                .chain(
371                    txn.get_system_object_mappings()
372                        .map(|som| som.unique_identifier.global_id),
373                )
374                .collect();
375            let dyncfgs = config.persist_client.dyncfgs().clone();
376            let build_version = if config.build_info.is_dev() {
377                // A single dev version can be used for many different builds, so we need to use
378                // the build version that is also enriched with build metadata.
379                config
380                    .build_info
381                    .semver_version_build()
382                    .expect("build ID is not available on your platform!")
383            } else {
384                config.build_info.semver_version()
385            };
386            let expr_cache_config = ExpressionCacheConfig {
387                build_version,
388                shard_id: txn
389                    .get_expression_cache_shard()
390                    .expect("expression cache shard should exist for opened catalogs"),
391                persist: config.persist_client,
392                current_ids,
393                remove_prior_versions: !config.read_only,
394                compact_shard: config.read_only,
395                dyncfgs,
396            };
397            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
398                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
399            (
400                Some(expr_cache_handle),
401                cached_local_exprs,
402                cached_global_exprs,
403            )
404        } else {
405            (None, BTreeMap::new(), BTreeMap::new())
406        };
407        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
408        info!(
409            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
410            expr_cache_start.elapsed()
411        );
412
413        // When initializing/bootstrapping, we don't use the catalog updates but
414        // instead load the catalog fully and then go ahead and apply commands
415        // to the controller(s). Maybe we _should_ instead use the same logic
416        // and return and use the updates from here. But that's at the very
417        // least future work.
418        let (builtin_table_update, _catalog_updates) = state
419            .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
420            .await;
421        builtin_table_updates.extend(builtin_table_update);
422
423        let last_seen_version =
424            get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
425
426        let mz_authentication_mock_nonce =
427            txn.get_authentication_mock_nonce().ok_or_else(|| {
428                Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
429            })?;
430
431        state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
432
433        // Migrate item ASTs.
434        let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
435            let migrate_result = migrate::migrate(
436                &mut state,
437                &mut txn,
438                &mut local_expr_cache,
439                item_updates,
440                config.now,
441                config.boot_ts,
442            )
443            .await
444            .map_err(|e| {
445                Error::new(ErrorKind::FailedCatalogMigration {
446                    last_seen_version: last_seen_version.clone(),
447                    this_version: config.build_info.version,
448                    cause: e.to_string(),
449                })
450            })?;
451            if !migrate_result.post_item_updates.is_empty() {
452                // Include any post-item-updates generated by migrations, and then consolidate
453                // them to ensure diffs are all positive.
454                post_item_updates.extend(migrate_result.post_item_updates);
455                // Push everything to the same timestamp so it consolidates cleanly.
456                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
457                    for (_, ts, _) in &mut post_item_updates {
458                        *ts = max_ts;
459                    }
460                }
461                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
462            }
463
464            (
465                migrate_result.builtin_table_updates,
466                migrate_result.catalog_updates,
467            )
468        } else {
469            state
470                .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
471                .await
472        };
473        builtin_table_updates.extend(builtin_table_update);
474
475        let post_item_updates = post_item_updates
476            .into_iter()
477            .map(|(kind, ts, diff)| StateUpdate {
478                kind: kind.into(),
479                ts,
480                diff: diff.try_into().expect("valid diff"),
481            })
482            .collect();
483        let (builtin_table_update, _catalog_updates) = state
484            .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
485            .await;
486        builtin_table_updates.extend(builtin_table_update);
487
488        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
489        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
490        // updates.
491        for audit_log_update in audit_log_updates {
492            builtin_table_updates.extend(
493                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
494            );
495        }
496
497        // Migrate builtin items.
498        let schema_migration_result = builtin_schema_migration::run(
499            config.build_info,
500            deploy_generation,
501            &mut txn,
502            config.builtin_item_migration_config,
503        )
504        .await?;
505
506        let state_updates = txn.get_and_commit_op_updates();
507
508        // When initializing/bootstrapping, we don't use the catalog updates but
509        // instead load the catalog fully and then go ahead and apply commands
510        // to the controller(s). Maybe we _should_ instead use the same logic
511        // and return and use the updates from here. But that's at the very
512        // least future work.
513        let (table_updates, _catalog_updates) = state
514            .apply_updates_for_bootstrap(state_updates, &mut local_expr_cache)
515            .await;
516        builtin_table_updates.extend(table_updates);
517        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
518
519        // Bump the migration version immediately before committing.
520        set_migration_version(&mut txn, config.build_info.semver_version())?;
521
522        txn.commit(config.boot_ts).await?;
523
524        // Now that the migration is durable, run any requested deferred cleanup.
525        schema_migration_result.cleanup_action.await;
526
527        Ok(InitializeStateResult {
528            state,
529            migrated_storage_collections_0dt: schema_migration_result.replaced_items,
530            new_builtin_collections: new_builtin_collections.into_iter().collect(),
531            builtin_table_updates,
532            last_seen_version,
533            expr_cache_handle,
534            cached_global_exprs,
535            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
536        })
537    }
538
539    /// Opens or creates a catalog that stores data at `path`.
540    ///
541    /// Returns the catalog, metadata about builtin objects that have changed
542    /// schemas since last restart, a list of updates to builtin tables that
543    /// describe the initial state of the catalog, and the version of the
544    /// catalog before any migrations were performed.
545    ///
546    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
547    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
548    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
549    #[instrument(name = "catalog::open")]
550    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
551        async move {
552            let mut storage = config.storage;
553
554            let InitializeStateResult {
555                state,
556                migrated_storage_collections_0dt,
557                new_builtin_collections,
558                mut builtin_table_updates,
559                last_seen_version: _,
560                expr_cache_handle,
561                cached_global_exprs,
562                uncached_local_exprs,
563            } =
564                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
565                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
566                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
567                Self::initialize_state(config.state, &mut storage)
568                    .instrument(tracing::info_span!("catalog::initialize_state"))
569                    .boxed()
570                    .await?;
571
572            let catalog = Catalog {
573                state,
574                plans: CatalogPlans::default(),
575                expr_cache_handle,
576                transient_revision: 1,
577                storage: Arc::new(tokio::sync::Mutex::new(storage)),
578            };
579
580            // Operators aren't stored in the catalog, but we would like them in
581            // introspection views.
582            for (op, func) in OP_IMPLS.iter() {
583                match func {
584                    mz_sql::func::Func::Scalar(impls) => {
585                        for imp in impls {
586                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
587                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
588                            ));
589                        }
590                    }
591                    _ => unreachable!("all operators must be scalar functions"),
592                }
593            }
594
595            for ip in &catalog.state.egress_addresses {
596                builtin_table_updates.push(
597                    catalog
598                        .state
599                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
600                );
601            }
602
603            if !catalog.state.license_key.id.is_empty() {
604                builtin_table_updates.push(
605                    catalog.state.resolve_builtin_table_update(
606                        catalog
607                            .state
608                            .pack_license_key_update(&catalog.state.license_key)?,
609                    ),
610                );
611            }
612
613            catalog.storage().await.mark_bootstrap_complete().await;
614
615            Ok(OpenCatalogResult {
616                catalog,
617                migrated_storage_collections_0dt,
618                new_builtin_collections,
619                builtin_table_updates,
620                cached_global_exprs,
621                uncached_local_exprs,
622            })
623        }
624        .instrument(tracing::info_span!("catalog::open"))
625        .boxed()
626    }
627
628    /// Initializes STORAGE to understand all shards that `self` expects to
629    /// exist.
630    ///
631    /// Note that this must be done before creating/rendering collections
632    /// because the storage controller might not be aware of new system
633    /// collections created between versions.
634    async fn initialize_storage_state(
635        &mut self,
636        storage_collections: &Arc<
637            dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
638        >,
639    ) -> Result<(), mz_catalog::durable::CatalogError> {
640        let collections = self
641            .entries()
642            .filter(|entry| entry.item().is_storage_collection())
643            .flat_map(|entry| entry.global_ids())
644            .collect();
645
646        // Clone the state so that any errors that occur do not leak any
647        // transformations on error.
648        let mut state = self.state.clone();
649
650        let mut storage = self.storage().await;
651        let mut txn = storage.transaction().await?;
652
653        storage_collections
654            .initialize_state(&mut txn, collections)
655            .await
656            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
657
658        let updates = txn.get_and_commit_op_updates();
659        let (builtin_updates, catalog_updates) = state.apply_updates(updates)?;
660        assert!(
661            builtin_updates.is_empty(),
662            "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
663        );
664        assert!(
665            catalog_updates.is_empty(),
666            "storage is not allowed to generate catalog changes that would change the catalog or controller state"
667        );
668        let commit_ts = txn.upper();
669        txn.commit(commit_ts).await?;
670        drop(storage);
671
672        // Save updated state.
673        self.state = state;
674        Ok(())
675    }
676
677    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
678    /// so make it available and initialize the controller.
679    pub async fn initialize_controller(
680        &mut self,
681        config: mz_controller::ControllerConfig,
682        envd_epoch: core::num::NonZeroI64,
683        read_only: bool,
684    ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
685    {
686        let controller_start = Instant::now();
687        info!("startup: controller init: beginning");
688
689        let controller = {
690            let mut storage = self.storage().await;
691            let mut tx = storage.transaction().await?;
692            mz_controller::prepare_initialization(&mut tx)
693                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
694            let updates = tx.get_and_commit_op_updates();
695            assert!(
696                updates.is_empty(),
697                "initializing controller should not produce updates: {updates:?}"
698            );
699            let commit_ts = tx.upper();
700            tx.commit(commit_ts).await?;
701
702            let read_only_tx = storage.transaction().await?;
703
704            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
705        };
706
707        self.initialize_storage_state(&controller.storage_collections)
708            .await?;
709
710        info!(
711            "startup: controller init: complete in {:?}",
712            controller_start.elapsed()
713        );
714
715        Ok(controller)
716    }
717
718    /// Politely releases all external resources that can only be released in an async context.
719    pub async fn expire(self) {
720        // If no one else holds a reference to storage, then clean up the storage resources.
721        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
722        if let Some(storage) = Arc::into_inner(self.storage) {
723            let storage = storage.into_inner();
724            storage.expire().await;
725        }
726    }
727}
728
729impl CatalogState {
730    /// Set the default value for `name`, which is the value it will be reset to.
731    fn set_system_configuration_default(
732        &mut self,
733        name: &str,
734        value: VarInput,
735    ) -> Result<(), Error> {
736        Ok(self.system_configuration.set_default(name, value)?)
737    }
738}
739
740/// Updates the catalog with new and removed builtin items.
741///
742/// Returns the list of new builtin [`GlobalId`]s.
743fn add_new_remove_old_builtin_items_migration(
744    builtins_cfg: &BuiltinsConfig,
745    txn: &mut mz_catalog::durable::Transaction<'_>,
746) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
747    let mut new_builtin_mappings = Vec::new();
748    // Used to validate unique descriptions.
749    let mut builtin_descs = HashSet::new();
750
751    // We compare the builtin items that are compiled into the binary with the builtin items that
752    // are persisted in the catalog to discover new and deleted builtin items.
753    let mut builtins = Vec::new();
754    for builtin in BUILTINS::iter(builtins_cfg) {
755        let desc = SystemObjectDescription {
756            schema_name: builtin.schema().to_string(),
757            object_type: builtin.catalog_item_type(),
758            object_name: builtin.name().to_string(),
759        };
760        // Validate that the description is unique.
761        if !builtin_descs.insert(desc.clone()) {
762            panic!(
763                "duplicate builtin description: {:?}, {:?}",
764                SystemObjectDescription {
765                    schema_name: builtin.schema().to_string(),
766                    object_type: builtin.catalog_item_type(),
767                    object_name: builtin.name().to_string(),
768                },
769                builtin
770            );
771        }
772        builtins.push((desc, builtin));
773    }
774
775    let mut system_object_mappings: BTreeMap<_, _> = txn
776        .get_system_object_mappings()
777        .map(|system_object_mapping| {
778            (
779                system_object_mapping.description.clone(),
780                system_object_mapping,
781            )
782        })
783        .collect();
784
785    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
786        builtins.into_iter().partition_map(|(desc, builtin)| {
787            let fingerprint = match builtin.runtime_alterable() {
788                false => builtin.fingerprint(),
789                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
790            };
791            match system_object_mappings.remove(&desc) {
792                Some(system_object_mapping) => {
793                    Either::Left((builtin, system_object_mapping, fingerprint))
794                }
795                None => Either::Right((builtin, fingerprint)),
796            }
797        });
798    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
799    let new_builtins: Vec<_> = new_builtins
800        .into_iter()
801        .zip_eq(new_builtin_ids.clone())
802        .collect();
803
804    // Add new builtin items to catalog.
805    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
806        new_builtin_mappings.push(SystemObjectMapping {
807            description: SystemObjectDescription {
808                schema_name: builtin.schema().to_string(),
809                object_type: builtin.catalog_item_type(),
810                object_name: builtin.name().to_string(),
811            },
812            unique_identifier: SystemObjectUniqueIdentifier {
813                catalog_id,
814                global_id,
815                fingerprint,
816            },
817        });
818
819        // Runtime-alterable system objects are durably recorded to the
820        // usual items collection, so that they can be later altered at
821        // runtime by their owner (i.e., outside of the usual builtin
822        // migration framework that requires changes to the binary
823        // itself).
824        let handled_runtime_alterable = match builtin {
825            Builtin::Connection(c) if c.runtime_alterable => {
826                let mut acl_items = vec![rbac::owner_privilege(
827                    mz_sql::catalog::ObjectType::Connection,
828                    c.owner_id.clone(),
829                )];
830                acl_items.extend_from_slice(c.access);
831                // Builtin Connections cannot be versioned.
832                let versions = BTreeMap::new();
833
834                txn.insert_item(
835                    catalog_id,
836                    c.oid,
837                    global_id,
838                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
839                    c.name,
840                    c.sql.into(),
841                    *c.owner_id,
842                    acl_items,
843                    versions,
844                )?;
845                true
846            }
847            _ => false,
848        };
849        assert_eq!(
850            builtin.runtime_alterable(),
851            handled_runtime_alterable,
852            "runtime alterable object was not handled by migration",
853        );
854    }
855    txn.set_system_object_mappings(new_builtin_mappings)?;
856
857    // Update comments of all builtin objects
858    let builtins_with_catalog_ids = existing_builtins
859        .iter()
860        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
861        .chain(
862            new_builtins
863                .into_iter()
864                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
865        );
866
867    for (builtin, id) in builtins_with_catalog_ids {
868        let (comment_id, desc, comments) = match builtin {
869            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
870            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
871            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
872            Builtin::Log(_)
873            | Builtin::Type(_)
874            | Builtin::Func(_)
875            | Builtin::ContinualTask(_)
876            | Builtin::Index(_)
877            | Builtin::Connection(_) => continue,
878        };
879        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
880
881        let mut comments = comments.clone();
882        for (col_idx, name) in desc.iter_names().enumerate() {
883            if let Some(comment) = comments.remove(name.as_str()) {
884                // Comment column indices are 1 based
885                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
886            }
887        }
888        assert!(
889            comments.is_empty(),
890            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
891        );
892    }
893
894    // Anything left in `system_object_mappings` must have been deleted and should be removed from
895    // the catalog.
896    let mut deleted_system_objects = BTreeSet::new();
897    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
898    let mut deleted_comments = BTreeSet::new();
899    for (desc, mapping) in system_object_mappings {
900        deleted_system_objects.insert(mapping.description);
901        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
902            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
903        }
904
905        let id = mapping.unique_identifier.catalog_id;
906        let comment_id = match desc.object_type {
907            CatalogItemType::Table => CommentObjectId::Table(id),
908            CatalogItemType::Source => CommentObjectId::Source(id),
909            CatalogItemType::View => CommentObjectId::View(id),
910            CatalogItemType::Sink
911            | CatalogItemType::MaterializedView
912            | CatalogItemType::Index
913            | CatalogItemType::Type
914            | CatalogItemType::Func
915            | CatalogItemType::Secret
916            | CatalogItemType::Connection
917            | CatalogItemType::ContinualTask => continue,
918        };
919        deleted_comments.insert(comment_id);
920    }
921    // If you are 100% positive that it is safe to delete a system object outside any of the
922    // unstable schemas, then add it to this set. Make sure that no prod environments are
923    // using this object and that the upgrade checker does not show any issues.
924    //
925    // Objects can be removed from this set after one release.
926    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
927    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
928    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
929    // handle that scenario correctly.
930    assert!(
931        deleted_system_objects
932            .iter()
933            // It's okay if Indexes change because they're inherently ephemeral.
934            .filter(|object| object.object_type != CatalogItemType::Index)
935            .all(
936                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
937                    || delete_exceptions.contains(deleted_object)
938            ),
939        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
940        deleted_system_objects
941    );
942    txn.drop_comments(&deleted_comments)?;
943    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
944    txn.remove_system_object_mappings(deleted_system_objects)?;
945
946    // Filter down to just the GlobalIds which are used to track the underlying collections.
947    let new_builtin_collections = new_builtin_ids
948        .into_iter()
949        .map(|(_catalog_id, global_id)| global_id)
950        .collect();
951
952    Ok(new_builtin_collections)
953}
954
955fn add_new_remove_old_builtin_clusters_migration(
956    txn: &mut mz_catalog::durable::Transaction<'_>,
957    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
958) -> Result<(), mz_catalog::durable::CatalogError> {
959    let mut durable_clusters: BTreeMap<_, _> = txn
960        .get_clusters()
961        .filter(|cluster| cluster.id.is_system())
962        .map(|cluster| (cluster.name.to_string(), cluster))
963        .collect();
964
965    // Add new clusters.
966    for builtin_cluster in BUILTIN_CLUSTERS {
967        if durable_clusters.remove(builtin_cluster.name).is_none() {
968            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
969
970            txn.insert_system_cluster(
971                builtin_cluster.name,
972                vec![],
973                builtin_cluster.privileges.to_vec(),
974                builtin_cluster.owner_id.to_owned(),
975                mz_catalog::durable::ClusterConfig {
976                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
977                        size: cluster_config.size,
978                        availability_zones: vec![],
979                        replication_factor: cluster_config.replication_factor,
980                        logging: default_logging_config(),
981                        optimizer_feature_overrides: Default::default(),
982                        schedule: Default::default(),
983                    }),
984                    workload_class: None,
985                },
986                &HashSet::new(),
987            )?;
988        }
989    }
990
991    // Remove old clusters.
992    let old_clusters = durable_clusters
993        .values()
994        .map(|cluster| cluster.id)
995        .collect();
996    txn.remove_clusters(&old_clusters)?;
997
998    Ok(())
999}
1000
1001fn add_new_remove_old_builtin_introspection_source_migration(
1002    txn: &mut mz_catalog::durable::Transaction<'_>,
1003) -> Result<(), AdapterError> {
1004    let mut new_indexes = Vec::new();
1005    let mut removed_indexes = BTreeSet::new();
1006    for cluster in txn.get_clusters() {
1007        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1008
1009        let mut new_logs = Vec::new();
1010
1011        for log in BUILTINS::logs() {
1012            if introspection_source_index_ids.remove(log.name).is_none() {
1013                new_logs.push(log);
1014            }
1015        }
1016
1017        for log in new_logs {
1018            let (item_id, gid) =
1019                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1020            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1021        }
1022
1023        // Anything left in `introspection_source_index_ids` must have been deleted and should be
1024        // removed from the catalog.
1025        removed_indexes.extend(
1026            introspection_source_index_ids
1027                .into_keys()
1028                .map(|name| (cluster.id, name.to_string())),
1029        );
1030    }
1031    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1032    txn.remove_introspection_source_indexes(removed_indexes)?;
1033    Ok(())
1034}
1035
1036fn add_new_remove_old_builtin_roles_migration(
1037    txn: &mut mz_catalog::durable::Transaction<'_>,
1038) -> Result<(), mz_catalog::durable::CatalogError> {
1039    let mut durable_roles: BTreeMap<_, _> = txn
1040        .get_roles()
1041        .filter(|role| role.id.is_system() || role.id.is_predefined())
1042        .map(|role| (role.name.to_string(), role))
1043        .collect();
1044
1045    // Add new roles.
1046    for builtin_role in BUILTIN_ROLES {
1047        if durable_roles.remove(builtin_role.name).is_none() {
1048            txn.insert_builtin_role(
1049                builtin_role.id,
1050                builtin_role.name.to_string(),
1051                builtin_role.attributes.clone(),
1052                RoleMembership::new(),
1053                RoleVars::default(),
1054                builtin_role.oid,
1055            )?;
1056        }
1057    }
1058
1059    // Remove old roles.
1060    let old_roles = durable_roles.values().map(|role| role.id).collect();
1061    txn.remove_roles(&old_roles)?;
1062
1063    Ok(())
1064}
1065
1066fn add_new_remove_old_builtin_cluster_replicas_migration(
1067    txn: &mut Transaction<'_>,
1068    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1069) -> Result<(), AdapterError> {
1070    let cluster_lookup: BTreeMap<_, _> = txn
1071        .get_clusters()
1072        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1073        .collect();
1074
1075    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1076        .get_cluster_replicas()
1077        .filter(|replica| replica.replica_id.is_system())
1078        .fold(BTreeMap::new(), |mut acc, replica| {
1079            acc.entry(replica.cluster_id)
1080                .or_insert_with(BTreeMap::new)
1081                .insert(replica.name.to_string(), replica);
1082            acc
1083        });
1084
1085    // Add new replicas.
1086    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1087        let cluster = cluster_lookup
1088            .get(builtin_replica.cluster_name)
1089            .expect("builtin cluster replica references non-existent cluster");
1090        // `empty_map` is a hack to simplify the if statement below.
1091        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1092        let replica_names = durable_replicas
1093            .get_mut(&cluster.id)
1094            .unwrap_or(&mut empty_map);
1095
1096        let builtin_cluster_bootstrap_config =
1097            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1098        if replica_names.remove(builtin_replica.name).is_none()
1099            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1100            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1101            // factor is configurable on bootstrap.
1102            && builtin_cluster_bootstrap_config.replication_factor > 0
1103        {
1104            let replica_size = match cluster.config.variant {
1105                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1106                ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size,
1107            };
1108
1109            let config = builtin_cluster_replica_config(replica_size);
1110            txn.insert_cluster_replica(
1111                cluster.id,
1112                builtin_replica.name,
1113                config,
1114                MZ_SYSTEM_ROLE_ID,
1115            )?;
1116        }
1117    }
1118
1119    // Remove old replicas.
1120    let old_replicas = durable_replicas
1121        .values()
1122        .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1123        .collect();
1124    txn.remove_cluster_replicas(&old_replicas)?;
1125
1126    Ok(())
1127}
1128
1129/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1130/// the 'cluster' parameter.
1131///
1132/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1133/// input is no longer valid. For example if we remove a configuration parameter or change the
1134/// accepted set of values.
1135fn remove_invalid_config_param_role_defaults_migration(
1136    txn: &mut Transaction<'_>,
1137) -> Result<(), AdapterError> {
1138    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1139
1140    let roles_to_migrate: BTreeMap<_, _> = txn
1141        .get_roles()
1142        .filter_map(|mut role| {
1143            // Create an empty SessionVars just so we can check if a var is valid.
1144            //
1145            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1146            // session variables.
1147            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1148
1149            // Iterate over all of the variable defaults for this role.
1150            let mut invalid_roles_vars = BTreeMap::new();
1151            for (name, value) in &role.vars.map {
1152                // If one does not exist or its value is invalid, then mark it for removal.
1153                let Ok(session_var) = session_vars.inspect(name) else {
1154                    invalid_roles_vars.insert(name.clone(), value.clone());
1155                    continue;
1156                };
1157                if session_var.check(value.borrow()).is_err() {
1158                    invalid_roles_vars.insert(name.clone(), value.clone());
1159                }
1160            }
1161
1162            // If the role has no invalid values, nothing to do!
1163            if invalid_roles_vars.is_empty() {
1164                return None;
1165            }
1166
1167            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1168
1169            // Otherwise, remove the variables from the role and return it to be updated.
1170            for (name, _value) in invalid_roles_vars {
1171                role.vars.map.remove(&name);
1172            }
1173            Some(role)
1174        })
1175        .map(|role| (role.id, role))
1176        .collect();
1177
1178    txn.update_roles_without_auth(roles_to_migrate)?;
1179
1180    Ok(())
1181}
1182
1183/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1184/// are marked as pending and should be cleaned up on catalog opsn.
1185fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1186    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1187        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1188            replica.config.location
1189        {
1190            tx.remove_cluster_replica(replica.replica_id)?;
1191        }
1192    }
1193    Ok(())
1194}
1195
1196pub(crate) fn builtin_cluster_replica_config(
1197    replica_size: String,
1198) -> mz_catalog::durable::ReplicaConfig {
1199    mz_catalog::durable::ReplicaConfig {
1200        location: mz_catalog::durable::ReplicaLocation::Managed {
1201            availability_zone: None,
1202            billed_as: None,
1203            pending: false,
1204            internal: false,
1205            size: replica_size,
1206        },
1207        logging: default_logging_config(),
1208    }
1209}
1210
1211fn default_logging_config() -> ReplicaLogging {
1212    ReplicaLogging {
1213        log_logging: false,
1214        interval: Some(Duration::from_secs(1)),
1215    }
1216}
1217
1218#[derive(Debug)]
1219pub struct BuiltinBootstrapClusterConfigMap {
1220    /// Size and replication factor to default system_cluster on bootstrap
1221    pub system_cluster: BootstrapBuiltinClusterConfig,
1222    /// Size and replication factor to default catalog_server_cluster on bootstrap
1223    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1224    /// Size and replication factor to default probe_cluster on bootstrap
1225    pub probe_cluster: BootstrapBuiltinClusterConfig,
1226    /// Size and replication factor to default support_cluster on bootstrap
1227    pub support_cluster: BootstrapBuiltinClusterConfig,
1228    /// Size to default analytics_cluster on bootstrap
1229    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1230}
1231
1232impl BuiltinBootstrapClusterConfigMap {
1233    /// Gets the size of the builtin cluster based on the provided name
1234    fn get_config(
1235        &self,
1236        cluster_name: &str,
1237    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1238        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1239            &self.system_cluster
1240        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1241            &self.catalog_server_cluster
1242        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1243            &self.probe_cluster
1244        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1245            &self.support_cluster
1246        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1247            &self.analytics_cluster
1248        } else {
1249            return Err(mz_catalog::durable::CatalogError::Catalog(
1250                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1251            ));
1252        };
1253        Ok(cluster_config.clone())
1254    }
1255}
1256
1257/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1258///
1259///   - Convert each update into a type that implements [`std::cmp::Ord`].
1260///   - Update the timestamp of each update to the same value.
1261///   - Convert the diff of each update to a type that implements
1262///     [`differential_dataflow::difference::Semigroup`].
1263///
1264/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1265/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1266/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1267/// and are only created after bootstrap is complete. So we forcibly convert each
1268/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1269/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1270/// temporary item variant and does implement [`std::cmp::Ord`].
1271///
1272/// WARNING: Do not call outside of startup.
1273pub(crate) fn into_consolidatable_updates_startup(
1274    updates: Vec<StateUpdate>,
1275    ts: Timestamp,
1276) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1277    updates
1278        .into_iter()
1279        .map(|StateUpdate { kind, ts: _, diff }| {
1280            let kind: BootstrapStateUpdateKind = kind
1281                .try_into()
1282                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1283            (kind, ts, Diff::from(diff))
1284        })
1285        .collect()
1286}
1287
1288fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1289    defaults: &BTreeMap<String, String>,
1290    remote: Option<&BTreeMap<String, String>>,
1291    cfg: &mz_dyncfg::Config<T>,
1292) -> T::ConfigType {
1293    let mut val = T::into_config_type(cfg.default().clone());
1294    let get_fn = |map: &BTreeMap<String, String>| {
1295        let val = map.get(cfg.name())?;
1296        match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1297            Ok(x) => Some(x),
1298            Err(err) => {
1299                tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1300                None
1301            }
1302        }
1303    };
1304    if let Some(x) = get_fn(defaults) {
1305        val = x;
1306    }
1307    if let Some(x) = remote.and_then(get_fn) {
1308        val = x;
1309    }
1310    val
1311}