mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_item_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use futures::future::{BoxFuture, FutureExt};
19use itertools::{Either, Itertools};
20use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
21use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
22use mz_auth::hash::scram256_hash;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
26    Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
27};
28use mz_catalog::config::StateConfig;
29use mz_catalog::durable::objects::{
30    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
31};
32use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
33use mz_catalog::expr_cache::{
34    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
35};
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
39};
40use mz_controller::clusters::ReplicaLogging;
41use mz_controller_types::ClusterId;
42use mz_ore::cast::usize_to_u64;
43use mz_ore::collections::HashSet;
44use mz_ore::now::{SYSTEM_TIME, to_datetime};
45use mz_ore::{instrument, soft_assert_no_log};
46use mz_repr::adt::mz_acl_item::PrivilegeMap;
47use mz_repr::namespaces::is_unstable_schema;
48use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
49use mz_sql::catalog::{
50    BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
51};
52use mz_sql::func::OP_IMPLS;
53use mz_sql::names::CommentObjectId;
54use mz_sql::rbac;
55use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
56use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
57use mz_storage_client::storage_collections::StorageCollections;
58use tracing::{Instrument, info, warn};
59use uuid::Uuid;
60
61// DO NOT add any more imports from `crate` outside of `crate::catalog`.
62use crate::AdapterError;
63use crate::catalog::open::builtin_item_migration::{
64    BuiltinItemMigrationResult, migrate_builtin_items,
65};
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68    BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name, migrate,
69};
70
71pub struct InitializeStateResult {
72    /// An initialized [`CatalogState`].
73    pub state: CatalogState,
74    /// A set of new shards that may need to be initialized (only used by 0dt migration).
75    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
76    /// A set of new builtin items.
77    pub new_builtin_collections: BTreeSet<GlobalId>,
78    /// A list of builtin table updates corresponding to the initialized state.
79    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
80    /// The version of the catalog that existed before initializing the catalog.
81    pub last_seen_version: String,
82    /// A handle to the expression cache if it's enabled.
83    pub expr_cache_handle: Option<ExpressionCacheHandle>,
84    /// The global expressions that were cached in `expr_cache_handle`.
85    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
86    /// The local expressions that were NOT cached in `expr_cache_handle`.
87    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
88}
89
90pub struct OpenCatalogResult {
91    /// An opened [`Catalog`].
92    pub catalog: Catalog,
93    /// A set of new shards that may need to be initialized.
94    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
95    /// A set of new builtin items.
96    pub new_builtin_collections: BTreeSet<GlobalId>,
97    /// A list of builtin table updates corresponding to the initialized state.
98    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
99    /// The global expressions that were cached in the expression cache.
100    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
101    /// The local expressions that were NOT cached in the expression cache.
102    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
103}
104
105impl Catalog {
106    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
107    /// external to a [mz_catalog::durable::DurableCatalogState]
108    /// (for example: no [mz_secrets::SecretsReader]).
109    pub async fn initialize_state<'a>(
110        config: StateConfig,
111        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
112    ) -> Result<InitializeStateResult, AdapterError> {
113        for builtin_role in BUILTIN_ROLES {
114            assert!(
115                is_reserved_name(builtin_role.name),
116                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
117                BUILTIN_PREFIXES.join(", ")
118            );
119        }
120        for builtin_cluster in BUILTIN_CLUSTERS {
121            assert!(
122                is_reserved_name(builtin_cluster.name),
123                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
124                BUILTIN_PREFIXES.join(", ")
125            );
126        }
127
128        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
129        if config.all_features {
130            system_configuration.enable_all_feature_flags_by_default();
131        }
132
133        let mut state = CatalogState {
134            database_by_name: BTreeMap::new(),
135            database_by_id: BTreeMap::new(),
136            entry_by_id: BTreeMap::new(),
137            entry_by_global_id: BTreeMap::new(),
138            ambient_schemas_by_name: BTreeMap::new(),
139            ambient_schemas_by_id: BTreeMap::new(),
140            clusters_by_name: BTreeMap::new(),
141            clusters_by_id: BTreeMap::new(),
142            roles_by_name: BTreeMap::new(),
143            roles_by_id: BTreeMap::new(),
144            network_policies_by_id: BTreeMap::new(),
145            role_auth_by_id: BTreeMap::new(),
146            network_policies_by_name: BTreeMap::new(),
147            system_configuration,
148            default_privileges: DefaultPrivileges::default(),
149            system_privileges: PrivilegeMap::default(),
150            comments: CommentsMap::default(),
151            source_references: BTreeMap::new(),
152            storage_metadata: Default::default(),
153            temporary_schemas: BTreeMap::new(),
154            mock_authentication_nonce: Default::default(),
155            config: mz_sql::catalog::CatalogConfig {
156                start_time: to_datetime((config.now)()),
157                start_instant: Instant::now(),
158                nonce: rand::random(),
159                environment_id: config.environment_id,
160                session_id: Uuid::new_v4(),
161                build_info: config.build_info,
162                timestamp_interval: Duration::from_secs(1),
163                now: config.now.clone(),
164                connection_context: config.connection_context,
165                builtins_cfg: BuiltinsConfig {
166                    // This will fall back to the default in code (false) if we timeout on the
167                    // initial LD sync. We're just using this to get some additional testing in on
168                    // CTs so a false negative is fine, we're only worried about false positives.
169                    include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
170                        &config.system_parameter_defaults,
171                        config.remote_system_parameters.as_ref(),
172                        &ENABLE_CONTINUAL_TASK_BUILTINS,
173                    ),
174                },
175                helm_chart_version: config.helm_chart_version,
176            },
177            cluster_replica_sizes: config.cluster_replica_sizes,
178            availability_zones: config.availability_zones,
179            egress_addresses: config.egress_addresses,
180            aws_principal_context: config.aws_principal_context,
181            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
182            http_host_name: config.http_host_name,
183            license_key: config.license_key,
184        };
185
186        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
187        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
188        let mut txn = storage.transaction().await?;
189
190        // Migrate/update durable data before we start loading the in-memory catalog.
191        let (migrated_builtins, new_builtin_collections) = {
192            migrate::durable_migrate(
193                &mut txn,
194                state.config.environment_id.organization_id(),
195                config.boot_ts,
196            )?;
197            // Overwrite and persist selected parameter values in `remote_system_parameters` that
198            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
199            if let Some(remote_system_parameters) = config.remote_system_parameters {
200                for (name, value) in remote_system_parameters {
201                    txn.upsert_system_config(&name, value)?;
202                }
203                txn.set_system_config_synced_once()?;
204            }
205            // Add any new builtin objects and remove old ones.
206            let (migrated_builtins, new_builtin_collections) =
207                add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
208            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
209                system_cluster: config.builtin_system_cluster_config,
210                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
211                probe_cluster: config.builtin_probe_cluster_config,
212                support_cluster: config.builtin_support_cluster_config,
213                analytics_cluster: config.builtin_analytics_cluster_config,
214            };
215            add_new_remove_old_builtin_clusters_migration(
216                &mut txn,
217                &builtin_bootstrap_cluster_config_map,
218            )?;
219            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
220            add_new_remove_old_builtin_cluster_replicas_migration(
221                &mut txn,
222                &builtin_bootstrap_cluster_config_map,
223            )?;
224            add_new_remove_old_builtin_roles_migration(&mut txn)?;
225            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
226            (migrated_builtins, new_builtin_collections)
227        };
228        remove_pending_cluster_replicas_migration(&mut txn)?;
229
230        let op_updates = txn.get_and_commit_op_updates();
231        updates.extend(op_updates);
232
233        // Seed the in-memory catalog with values that don't come from the durable catalog.
234        {
235            // Set defaults from configuration passed in the provided `system_parameter_defaults`
236            // map.
237            for (name, value) in config.system_parameter_defaults {
238                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
239                    Ok(_) => (),
240                    Err(Error {
241                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
242                    }) => {
243                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
244                    }
245                    Err(e) => return Err(e.into()),
246                };
247            }
248            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
249            if let Some(password) = config.external_login_password_mz_system {
250                state.role_auth_by_id.insert(
251                    MZ_SYSTEM_ROLE_ID,
252                    RoleAuth {
253                        role_id: MZ_SYSTEM_ROLE_ID,
254                        password_hash: Some(scram256_hash(&password).map_err(|_| {
255                            AdapterError::Internal("Failed to hash mz_system password.".to_owned())
256                        })?),
257                        updated_at: SYSTEM_TIME(),
258                    },
259                );
260            }
261        }
262
263        let mut builtin_table_updates = Vec::new();
264
265        // Make life easier by consolidating all updates, so that we end up with only positive
266        // diffs.
267        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
268        differential_dataflow::consolidation::consolidate_updates(&mut updates);
269        soft_assert_no_log!(
270            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
271            "consolidated updates should be positive during startup: {updates:?}"
272        );
273
274        let mut pre_item_updates = Vec::new();
275        let mut system_item_updates = Vec::new();
276        let mut item_updates = Vec::new();
277        let mut post_item_updates = Vec::new();
278        let mut audit_log_updates = Vec::new();
279        for (kind, ts, diff) in updates {
280            match kind {
281                BootstrapStateUpdateKind::Role(_)
282                | BootstrapStateUpdateKind::RoleAuth(_)
283                | BootstrapStateUpdateKind::Database(_)
284                | BootstrapStateUpdateKind::Schema(_)
285                | BootstrapStateUpdateKind::DefaultPrivilege(_)
286                | BootstrapStateUpdateKind::SystemPrivilege(_)
287                | BootstrapStateUpdateKind::SystemConfiguration(_)
288                | BootstrapStateUpdateKind::Cluster(_)
289                | BootstrapStateUpdateKind::NetworkPolicy(_)
290                | BootstrapStateUpdateKind::ClusterReplica(_) => {
291                    pre_item_updates.push(StateUpdate {
292                        kind: kind.into(),
293                        ts,
294                        diff: diff.try_into().expect("valid diff"),
295                    })
296                }
297                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
298                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
299                    system_item_updates.push(StateUpdate {
300                        kind: kind.into(),
301                        ts,
302                        diff: diff.try_into().expect("valid diff"),
303                    })
304                }
305                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
306                    kind: kind.into(),
307                    ts,
308                    diff: diff.try_into().expect("valid diff"),
309                }),
310                BootstrapStateUpdateKind::Comment(_)
311                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
312                | BootstrapStateUpdateKind::SourceReferences(_)
313                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
314                    post_item_updates.push((kind, ts, diff));
315                }
316                BootstrapStateUpdateKind::AuditLog(_) => {
317                    audit_log_updates.push(StateUpdate {
318                        kind: kind.into(),
319                        ts,
320                        diff: diff.try_into().expect("valid diff"),
321                    });
322                }
323            }
324        }
325
326        let builtin_table_update = state
327            .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
328            .await;
329        builtin_table_updates.extend(builtin_table_update);
330
331        let expr_cache_start = Instant::now();
332        info!("startup: coordinator init: catalog open: expr cache open beginning");
333        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
334        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
335        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
336        let expr_cache_enabled = config
337            .enable_expression_cache_override
338            .unwrap_or(enable_expr_cache_dyncfg);
339        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
340            info!(
341                ?config.enable_expression_cache_override,
342                ?enable_expr_cache_dyncfg,
343                "using expression cache for startup"
344            );
345            let current_ids = txn
346                .get_items()
347                .flat_map(|item| {
348                    let gid = item.global_id.clone();
349                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
350                    std::iter::once(gid).chain(gids.into_iter())
351                })
352                .chain(
353                    txn.get_system_object_mappings()
354                        .map(|som| som.unique_identifier.global_id),
355                )
356                .collect();
357            let dyncfgs = config.persist_client.dyncfgs().clone();
358            let build_version = if config.build_info.is_dev() {
359                // A single dev version can be used for many different builds, so we need to use
360                // the build version that is also enriched with build metadata.
361                config
362                    .build_info
363                    .semver_version_build()
364                    .expect("build ID is not available on your platform!")
365            } else {
366                config.build_info.semver_version()
367            };
368            let expr_cache_config = ExpressionCacheConfig {
369                build_version,
370                shard_id: txn
371                    .get_expression_cache_shard()
372                    .expect("expression cache shard should exist for opened catalogs"),
373                persist: config.persist_client,
374                current_ids,
375                remove_prior_versions: !config.read_only,
376                compact_shard: config.read_only,
377                dyncfgs,
378            };
379            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
380                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
381            (
382                Some(expr_cache_handle),
383                cached_local_exprs,
384                cached_global_exprs,
385            )
386        } else {
387            (None, BTreeMap::new(), BTreeMap::new())
388        };
389        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
390        info!(
391            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
392            expr_cache_start.elapsed()
393        );
394
395        let builtin_table_update = state
396            .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
397            .await;
398        builtin_table_updates.extend(builtin_table_update);
399
400        let last_seen_version = txn
401            .get_catalog_content_version()
402            .unwrap_or("new")
403            .to_string();
404
405        let mz_authentication_mock_nonce =
406            txn.get_authentication_mock_nonce().ok_or_else(|| {
407                Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
408            })?;
409
410        state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
411
412        // Migrate item ASTs.
413        let builtin_table_update = if !config.skip_migrations {
414            let migrate_result = migrate::migrate(
415                &mut state,
416                &mut txn,
417                &mut local_expr_cache,
418                item_updates,
419                config.now,
420                config.boot_ts,
421            )
422            .await
423            .map_err(|e| {
424                Error::new(ErrorKind::FailedMigration {
425                    last_seen_version: last_seen_version.clone(),
426                    this_version: config.build_info.version,
427                    cause: e.to_string(),
428                })
429            })?;
430            if !migrate_result.post_item_updates.is_empty() {
431                // Include any post-item-updates generated by migrations, and then consolidate
432                // them to ensure diffs are all positive.
433                post_item_updates.extend(migrate_result.post_item_updates);
434                // Push everything to the same timestamp so it consolidates cleanly.
435                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
436                    for (_, ts, _) in &mut post_item_updates {
437                        *ts = max_ts;
438                    }
439                }
440                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
441            }
442
443            migrate_result.builtin_table_updates
444        } else {
445            state
446                .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
447                .await
448        };
449        builtin_table_updates.extend(builtin_table_update);
450
451        let post_item_updates = post_item_updates
452            .into_iter()
453            .map(|(kind, ts, diff)| StateUpdate {
454                kind: kind.into(),
455                ts,
456                diff: diff.try_into().expect("valid diff"),
457            })
458            .collect();
459        let builtin_table_update = state
460            .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
461            .await;
462        builtin_table_updates.extend(builtin_table_update);
463
464        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
465        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
466        // updates.
467        for audit_log_update in audit_log_updates {
468            builtin_table_updates.extend(
469                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
470            );
471        }
472
473        // Migrate builtin items.
474        let BuiltinItemMigrationResult {
475            builtin_table_updates: builtin_table_update,
476            migrated_storage_collections_0dt,
477            cleanup_action,
478        } = migrate_builtin_items(
479            &mut state,
480            &mut txn,
481            &mut local_expr_cache,
482            migrated_builtins,
483            config.builtin_item_migration_config,
484        )
485        .await?;
486        builtin_table_updates.extend(builtin_table_update);
487        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
488
489        txn.commit(config.boot_ts).await?;
490
491        cleanup_action.await;
492
493        Ok(InitializeStateResult {
494            state,
495            migrated_storage_collections_0dt,
496            new_builtin_collections: new_builtin_collections.into_iter().collect(),
497            builtin_table_updates,
498            last_seen_version,
499            expr_cache_handle,
500            cached_global_exprs,
501            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
502        })
503    }
504
505    /// Opens or creates a catalog that stores data at `path`.
506    ///
507    /// Returns the catalog, metadata about builtin objects that have changed
508    /// schemas since last restart, a list of updates to builtin tables that
509    /// describe the initial state of the catalog, and the version of the
510    /// catalog before any migrations were performed.
511    ///
512    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
513    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
514    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
515    #[instrument(name = "catalog::open")]
516    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
517        async move {
518            let mut storage = config.storage;
519
520            let InitializeStateResult {
521                state,
522                migrated_storage_collections_0dt,
523                new_builtin_collections,
524                mut builtin_table_updates,
525                last_seen_version: _,
526                expr_cache_handle,
527                cached_global_exprs,
528                uncached_local_exprs,
529            } =
530                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
531                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
532                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
533                Self::initialize_state(config.state, &mut storage)
534                    .instrument(tracing::info_span!("catalog::initialize_state"))
535                    .boxed()
536                    .await?;
537
538            let catalog = Catalog {
539                state,
540                plans: CatalogPlans::default(),
541                expr_cache_handle,
542                transient_revision: 1,
543                storage: Arc::new(tokio::sync::Mutex::new(storage)),
544            };
545
546            // Operators aren't stored in the catalog, but we would like them in
547            // introspection views.
548            for (op, func) in OP_IMPLS.iter() {
549                match func {
550                    mz_sql::func::Func::Scalar(impls) => {
551                        for imp in impls {
552                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
553                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
554                            ));
555                        }
556                    }
557                    _ => unreachable!("all operators must be scalar functions"),
558                }
559            }
560
561            for ip in &catalog.state.egress_addresses {
562                builtin_table_updates.push(
563                    catalog
564                        .state
565                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
566                );
567            }
568
569            if !catalog.state.license_key.id.is_empty() {
570                builtin_table_updates.push(
571                    catalog.state.resolve_builtin_table_update(
572                        catalog
573                            .state
574                            .pack_license_key_update(&catalog.state.license_key)?,
575                    ),
576                );
577            }
578
579            catalog.storage().await.mark_bootstrap_complete();
580
581            Ok(OpenCatalogResult {
582                catalog,
583                migrated_storage_collections_0dt,
584                new_builtin_collections,
585                builtin_table_updates,
586                cached_global_exprs,
587                uncached_local_exprs,
588            })
589        }
590        .instrument(tracing::info_span!("catalog::open"))
591        .boxed()
592    }
593
594    /// Initializes STORAGE to understand all shards that `self` expects to
595    /// exist.
596    ///
597    /// Note that this must be done before creating/rendering collections
598    /// because the storage controller might not be aware of new system
599    /// collections created between versions.
600    async fn initialize_storage_state(
601        &mut self,
602        storage_collections: &Arc<
603            dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
604        >,
605    ) -> Result<(), mz_catalog::durable::CatalogError> {
606        let collections = self
607            .entries()
608            .filter(|entry| entry.item().is_storage_collection())
609            .flat_map(|entry| entry.global_ids())
610            .collect();
611
612        // Clone the state so that any errors that occur do not leak any
613        // transformations on error.
614        let mut state = self.state.clone();
615
616        let mut storage = self.storage().await;
617        let mut txn = storage.transaction().await?;
618
619        storage_collections
620            .initialize_state(&mut txn, collections)
621            .await
622            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
623
624        let updates = txn.get_and_commit_op_updates();
625        let builtin_updates = state.apply_updates(updates)?;
626        assert!(builtin_updates.is_empty());
627        let commit_ts = txn.upper();
628        txn.commit(commit_ts).await?;
629        drop(storage);
630
631        // Save updated state.
632        self.state = state;
633        Ok(())
634    }
635
636    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
637    /// so make it available and initialize the controller.
638    pub async fn initialize_controller(
639        &mut self,
640        config: mz_controller::ControllerConfig,
641        envd_epoch: core::num::NonZeroI64,
642        read_only: bool,
643    ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
644    {
645        let controller_start = Instant::now();
646        info!("startup: controller init: beginning");
647
648        let controller = {
649            let mut storage = self.storage().await;
650            let mut tx = storage.transaction().await?;
651            mz_controller::prepare_initialization(&mut tx)
652                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
653            let updates = tx.get_and_commit_op_updates();
654            assert!(
655                updates.is_empty(),
656                "initializing controller should not produce updates: {updates:?}"
657            );
658            let commit_ts = tx.upper();
659            tx.commit(commit_ts).await?;
660
661            let read_only_tx = storage.transaction().await?;
662
663            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
664        };
665
666        self.initialize_storage_state(&controller.storage_collections)
667            .await?;
668
669        info!(
670            "startup: controller init: complete in {:?}",
671            controller_start.elapsed()
672        );
673
674        Ok(controller)
675    }
676
677    /// Politely releases all external resources that can only be released in an async context.
678    pub async fn expire(self) {
679        // If no one else holds a reference to storage, then clean up the storage resources.
680        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
681        if let Some(storage) = Arc::into_inner(self.storage) {
682            let storage = storage.into_inner();
683            storage.expire().await;
684        }
685    }
686}
687
688impl CatalogState {
689    /// Set the default value for `name`, which is the value it will be reset to.
690    fn set_system_configuration_default(
691        &mut self,
692        name: &str,
693        value: VarInput,
694    ) -> Result<(), Error> {
695        Ok(self.system_configuration.set_default(name, value)?)
696    }
697}
698
699/// Updates the catalog with new and removed builtin items.
700///
701/// Returns the list of builtin [`GlobalId`]s that need to be migrated, and the list of new builtin
702/// [`GlobalId`]s.
703fn add_new_remove_old_builtin_items_migration(
704    builtins_cfg: &BuiltinsConfig,
705    txn: &mut mz_catalog::durable::Transaction<'_>,
706) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
707    let mut new_builtin_mappings = Vec::new();
708    let mut migrated_builtin_ids = Vec::new();
709    // Used to validate unique descriptions.
710    let mut builtin_descs = HashSet::new();
711
712    // We compare the builtin items that are compiled into the binary with the builtin items that
713    // are persisted in the catalog to discover new, deleted, and migrated builtin items.
714    let mut builtins = Vec::new();
715    for builtin in BUILTINS::iter(builtins_cfg) {
716        let desc = SystemObjectDescription {
717            schema_name: builtin.schema().to_string(),
718            object_type: builtin.catalog_item_type(),
719            object_name: builtin.name().to_string(),
720        };
721        // Validate that the description is unique.
722        if !builtin_descs.insert(desc.clone()) {
723            panic!(
724                "duplicate builtin description: {:?}, {:?}",
725                SystemObjectDescription {
726                    schema_name: builtin.schema().to_string(),
727                    object_type: builtin.catalog_item_type(),
728                    object_name: builtin.name().to_string(),
729                },
730                builtin
731            );
732        }
733        builtins.push((desc, builtin));
734    }
735
736    let mut system_object_mappings: BTreeMap<_, _> = txn
737        .get_system_object_mappings()
738        .map(|system_object_mapping| {
739            (
740                system_object_mapping.description.clone(),
741                system_object_mapping,
742            )
743        })
744        .collect();
745
746    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
747        builtins.into_iter().partition_map(|(desc, builtin)| {
748            let fingerprint = match builtin.runtime_alterable() {
749                false => builtin.fingerprint(),
750                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
751            };
752            match system_object_mappings.remove(&desc) {
753                Some(system_object_mapping) => {
754                    Either::Left((builtin, system_object_mapping, fingerprint))
755                }
756                None => Either::Right((builtin, fingerprint)),
757            }
758        });
759    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
760    let new_builtins: Vec<_> = new_builtins
761        .into_iter()
762        .zip_eq(new_builtin_ids.clone())
763        .collect();
764
765    // Look for migrated builtins.
766    for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
767        if system_object_mapping.unique_identifier.fingerprint != fingerprint {
768            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly,
769            // it was cause the table to be truncated because the contents are not also
770            // stored in the durable catalog. Secondly, we prune `mz_storage_usage_by_shard`
771            // of old events in the background on startup. The correctness of that pruning
772            // relies on there being no other retractions to `mz_storage_usage_by_shard`.
773            assert_ne!(
774                *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
775                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
776            );
777            assert_ne!(
778                builtin.catalog_item_type(),
779                CatalogItemType::Type,
780                "types cannot be migrated"
781            );
782            assert_ne!(
783                system_object_mapping.unique_identifier.fingerprint,
784                RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
785                "clearing the runtime alterable flag on an existing object is not permitted",
786            );
787            assert!(
788                !builtin.runtime_alterable(),
789                "setting the runtime alterable flag on an existing object is not permitted"
790            );
791            migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
792        }
793    }
794
795    // Add new builtin items to catalog.
796    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
797        new_builtin_mappings.push(SystemObjectMapping {
798            description: SystemObjectDescription {
799                schema_name: builtin.schema().to_string(),
800                object_type: builtin.catalog_item_type(),
801                object_name: builtin.name().to_string(),
802            },
803            unique_identifier: SystemObjectUniqueIdentifier {
804                catalog_id,
805                global_id,
806                fingerprint,
807            },
808        });
809
810        // Runtime-alterable system objects are durably recorded to the
811        // usual items collection, so that they can be later altered at
812        // runtime by their owner (i.e., outside of the usual builtin
813        // migration framework that requires changes to the binary
814        // itself).
815        let handled_runtime_alterable = match builtin {
816            Builtin::Connection(c) if c.runtime_alterable => {
817                let mut acl_items = vec![rbac::owner_privilege(
818                    mz_sql::catalog::ObjectType::Connection,
819                    c.owner_id.clone(),
820                )];
821                acl_items.extend_from_slice(c.access);
822                // Builtin Connections cannot be versioned.
823                let versions = BTreeMap::new();
824
825                txn.insert_item(
826                    catalog_id,
827                    c.oid,
828                    global_id,
829                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
830                    c.name,
831                    c.sql.into(),
832                    *c.owner_id,
833                    acl_items,
834                    versions,
835                )?;
836                true
837            }
838            _ => false,
839        };
840        assert_eq!(
841            builtin.runtime_alterable(),
842            handled_runtime_alterable,
843            "runtime alterable object was not handled by migration",
844        );
845    }
846    txn.set_system_object_mappings(new_builtin_mappings)?;
847
848    // Update comments of all builtin objects
849    let builtins_with_catalog_ids = existing_builtins
850        .iter()
851        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
852        .chain(
853            new_builtins
854                .into_iter()
855                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
856        );
857
858    for (builtin, id) in builtins_with_catalog_ids {
859        let (comment_id, desc, comments) = match builtin {
860            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
861            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
862            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
863            Builtin::Log(_)
864            | Builtin::Type(_)
865            | Builtin::Func(_)
866            | Builtin::ContinualTask(_)
867            | Builtin::Index(_)
868            | Builtin::Connection(_) => continue,
869        };
870        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
871
872        let mut comments = comments.clone();
873        for (col_idx, name) in desc.iter_names().enumerate() {
874            if let Some(comment) = comments.remove(name.as_str()) {
875                // Comment column indices are 1 based
876                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
877            }
878        }
879        assert!(
880            comments.is_empty(),
881            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
882        );
883    }
884
885    // Anything left in `system_object_mappings` must have been deleted and should be removed from
886    // the catalog.
887    let mut deleted_system_objects = BTreeSet::new();
888    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
889    let mut deleted_comments = BTreeSet::new();
890    for (desc, mapping) in system_object_mappings {
891        deleted_system_objects.insert(mapping.description);
892        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
893            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
894        }
895
896        let id = mapping.unique_identifier.catalog_id;
897        let comment_id = match desc.object_type {
898            CatalogItemType::Table => CommentObjectId::Table(id),
899            CatalogItemType::Source => CommentObjectId::Source(id),
900            CatalogItemType::View => CommentObjectId::View(id),
901            CatalogItemType::Sink
902            | CatalogItemType::MaterializedView
903            | CatalogItemType::Index
904            | CatalogItemType::Type
905            | CatalogItemType::Func
906            | CatalogItemType::Secret
907            | CatalogItemType::Connection
908            | CatalogItemType::ContinualTask => continue,
909        };
910        deleted_comments.insert(comment_id);
911    }
912    // If you are 100% positive that it is safe to delete a system object outside any of the
913    // unstable schemas, then add it to this set. Make sure that no prod environments are
914    // using this object and that the upgrade checker does not show any issues.
915    //
916    // Objects can be removed from this set after one release.
917    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
918    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
919    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
920    // handle that scenario correctly.
921    assert!(
922        deleted_system_objects
923            .iter()
924            // It's okay if Indexes change because they're inherently ephemeral.
925            .filter(|object| object.object_type != CatalogItemType::Index)
926            .all(
927                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
928                    || delete_exceptions.contains(deleted_object)
929            ),
930        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
931        deleted_system_objects
932    );
933    txn.drop_comments(&deleted_comments)?;
934    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
935    txn.remove_system_object_mappings(deleted_system_objects)?;
936
937    // Filter down to just the GlobalIds which are used to track the underlying collections.
938    let new_builtin_collections = new_builtin_ids
939        .into_iter()
940        .map(|(_catalog_id, global_id)| global_id)
941        .collect();
942
943    Ok((migrated_builtin_ids, new_builtin_collections))
944}
945
946fn add_new_remove_old_builtin_clusters_migration(
947    txn: &mut mz_catalog::durable::Transaction<'_>,
948    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
949) -> Result<(), mz_catalog::durable::CatalogError> {
950    let mut durable_clusters: BTreeMap<_, _> = txn
951        .get_clusters()
952        .filter(|cluster| cluster.id.is_system())
953        .map(|cluster| (cluster.name.to_string(), cluster))
954        .collect();
955
956    // Add new clusters.
957    for builtin_cluster in BUILTIN_CLUSTERS {
958        if durable_clusters.remove(builtin_cluster.name).is_none() {
959            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
960
961            txn.insert_system_cluster(
962                builtin_cluster.name,
963                vec![],
964                builtin_cluster.privileges.to_vec(),
965                builtin_cluster.owner_id.to_owned(),
966                mz_catalog::durable::ClusterConfig {
967                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
968                        size: cluster_config.size,
969                        availability_zones: vec![],
970                        replication_factor: cluster_config.replication_factor,
971                        logging: default_logging_config(),
972                        optimizer_feature_overrides: Default::default(),
973                        schedule: Default::default(),
974                    }),
975                    workload_class: None,
976                },
977                &HashSet::new(),
978            )?;
979        }
980    }
981
982    // Remove old clusters.
983    let old_clusters = durable_clusters
984        .values()
985        .map(|cluster| cluster.id)
986        .collect();
987    txn.remove_clusters(&old_clusters)?;
988
989    Ok(())
990}
991
992fn add_new_remove_old_builtin_introspection_source_migration(
993    txn: &mut mz_catalog::durable::Transaction<'_>,
994) -> Result<(), AdapterError> {
995    let mut new_indexes = Vec::new();
996    let mut removed_indexes = BTreeSet::new();
997    for cluster in txn.get_clusters() {
998        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
999
1000        let mut new_logs = Vec::new();
1001
1002        for log in BUILTINS::logs() {
1003            if introspection_source_index_ids.remove(log.name).is_none() {
1004                new_logs.push(log);
1005            }
1006        }
1007
1008        for log in new_logs {
1009            let (item_id, gid) =
1010                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1011            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1012        }
1013
1014        // Anything left in `introspection_source_index_ids` must have been deleted and should be
1015        // removed from the catalog.
1016        removed_indexes.extend(
1017            introspection_source_index_ids
1018                .into_keys()
1019                .map(|name| (cluster.id, name.to_string())),
1020        );
1021    }
1022    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1023    txn.remove_introspection_source_indexes(removed_indexes)?;
1024    Ok(())
1025}
1026
1027fn add_new_remove_old_builtin_roles_migration(
1028    txn: &mut mz_catalog::durable::Transaction<'_>,
1029) -> Result<(), mz_catalog::durable::CatalogError> {
1030    let mut durable_roles: BTreeMap<_, _> = txn
1031        .get_roles()
1032        .filter(|role| role.id.is_system() || role.id.is_predefined())
1033        .map(|role| (role.name.to_string(), role))
1034        .collect();
1035
1036    // Add new roles.
1037    for builtin_role in BUILTIN_ROLES {
1038        if durable_roles.remove(builtin_role.name).is_none() {
1039            txn.insert_builtin_role(
1040                builtin_role.id,
1041                builtin_role.name.to_string(),
1042                builtin_role.attributes.clone(),
1043                RoleMembership::new(),
1044                RoleVars::default(),
1045                builtin_role.oid,
1046            )?;
1047        }
1048    }
1049
1050    // Remove old roles.
1051    let old_roles = durable_roles.values().map(|role| role.id).collect();
1052    txn.remove_roles(&old_roles)?;
1053
1054    Ok(())
1055}
1056
1057fn add_new_remove_old_builtin_cluster_replicas_migration(
1058    txn: &mut Transaction<'_>,
1059    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1060) -> Result<(), AdapterError> {
1061    let cluster_lookup: BTreeMap<_, _> = txn
1062        .get_clusters()
1063        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1064        .collect();
1065
1066    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1067        .get_cluster_replicas()
1068        .filter(|replica| replica.replica_id.is_system())
1069        .fold(BTreeMap::new(), |mut acc, replica| {
1070            acc.entry(replica.cluster_id)
1071                .or_insert_with(BTreeMap::new)
1072                .insert(replica.name.to_string(), replica);
1073            acc
1074        });
1075
1076    // Add new replicas.
1077    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1078        let cluster = cluster_lookup
1079            .get(builtin_replica.cluster_name)
1080            .expect("builtin cluster replica references non-existent cluster");
1081        // `empty_map` is a hack to simplify the if statement below.
1082        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1083        let replica_names = durable_replicas
1084            .get_mut(&cluster.id)
1085            .unwrap_or(&mut empty_map);
1086
1087        let builtin_cluster_boostrap_config =
1088            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1089        if replica_names.remove(builtin_replica.name).is_none()
1090            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1091            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1092            // factor is configurable on bootstrap.
1093            && builtin_cluster_boostrap_config.replication_factor > 0
1094        {
1095            let replica_size = match cluster.config.variant {
1096                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1097                ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
1098            };
1099
1100            let config = builtin_cluster_replica_config(replica_size);
1101            txn.insert_cluster_replica(
1102                cluster.id,
1103                builtin_replica.name,
1104                config,
1105                MZ_SYSTEM_ROLE_ID,
1106            )?;
1107        }
1108    }
1109
1110    // Remove old replicas.
1111    let old_replicas = durable_replicas
1112        .values()
1113        .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1114        .collect();
1115    txn.remove_cluster_replicas(&old_replicas)?;
1116
1117    Ok(())
1118}
1119
1120/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1121/// the 'cluster' parameter.
1122///
1123/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1124/// input is no longer valid. For example if we remove a configuration parameter or change the
1125/// accepted set of values.
1126fn remove_invalid_config_param_role_defaults_migration(
1127    txn: &mut Transaction<'_>,
1128) -> Result<(), AdapterError> {
1129    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1130
1131    let roles_to_migrate: BTreeMap<_, _> = txn
1132        .get_roles()
1133        .filter_map(|mut role| {
1134            // Create an empty SessionVars just so we can check if a var is valid.
1135            //
1136            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1137            // session variables.
1138            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1139
1140            // Iterate over all of the variable defaults for this role.
1141            let mut invalid_roles_vars = BTreeMap::new();
1142            for (name, value) in &role.vars.map {
1143                // If one does not exist or its value is invalid, then mark it for removal.
1144                let Ok(session_var) = session_vars.inspect(name) else {
1145                    invalid_roles_vars.insert(name.clone(), value.clone());
1146                    continue;
1147                };
1148                if session_var.check(value.borrow()).is_err() {
1149                    invalid_roles_vars.insert(name.clone(), value.clone());
1150                }
1151            }
1152
1153            // If the role has no invalid values, nothing to do!
1154            if invalid_roles_vars.is_empty() {
1155                return None;
1156            }
1157
1158            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1159
1160            // Otherwise, remove the variables from the role and return it to be updated.
1161            for (name, _value) in invalid_roles_vars {
1162                role.vars.map.remove(&name);
1163            }
1164            Some(role)
1165        })
1166        .map(|role| (role.id, role))
1167        .collect();
1168
1169    txn.update_roles_without_auth(roles_to_migrate)?;
1170
1171    Ok(())
1172}
1173
1174/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1175/// are marked as pending and should be cleaned up on catalog opsn.
1176fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1177    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1178        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1179            replica.config.location
1180        {
1181            tx.remove_cluster_replica(replica.replica_id)?;
1182        }
1183    }
1184    Ok(())
1185}
1186
1187pub(crate) fn builtin_cluster_replica_config(
1188    replica_size: String,
1189) -> mz_catalog::durable::ReplicaConfig {
1190    mz_catalog::durable::ReplicaConfig {
1191        location: mz_catalog::durable::ReplicaLocation::Managed {
1192            availability_zone: None,
1193            billed_as: None,
1194            pending: false,
1195            internal: false,
1196            size: replica_size,
1197        },
1198        logging: default_logging_config(),
1199    }
1200}
1201
1202fn default_logging_config() -> ReplicaLogging {
1203    ReplicaLogging {
1204        log_logging: false,
1205        interval: Some(Duration::from_secs(1)),
1206    }
1207}
1208
1209#[derive(Debug)]
1210pub struct BuiltinBootstrapClusterConfigMap {
1211    /// Size and replication factor to default system_cluster on bootstrap
1212    pub system_cluster: BootstrapBuiltinClusterConfig,
1213    /// Size and replication factor to default catalog_server_cluster on bootstrap
1214    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1215    /// Size and replication factor to default probe_cluster on bootstrap
1216    pub probe_cluster: BootstrapBuiltinClusterConfig,
1217    /// Size and replication factor to default support_cluster on bootstrap
1218    pub support_cluster: BootstrapBuiltinClusterConfig,
1219    /// Size to default analytics_cluster on bootstrap
1220    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1221}
1222
1223impl BuiltinBootstrapClusterConfigMap {
1224    /// Gets the size of the builtin cluster based on the provided name
1225    fn get_config(
1226        &self,
1227        cluster_name: &str,
1228    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1229        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1230            &self.system_cluster
1231        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1232            &self.catalog_server_cluster
1233        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1234            &self.probe_cluster
1235        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1236            &self.support_cluster
1237        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1238            &self.analytics_cluster
1239        } else {
1240            return Err(mz_catalog::durable::CatalogError::Catalog(
1241                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1242            ));
1243        };
1244        Ok(cluster_config.clone())
1245    }
1246}
1247
1248/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1249///
1250///   - Convert each update into a type that implements [`std::cmp::Ord`].
1251///   - Update the timestamp of each update to the same value.
1252///   - Convert the diff of each update to a type that implements
1253///     [`differential_dataflow::difference::Semigroup`].
1254///
1255/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1256/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1257/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1258/// and are only created after bootstrap is complete. So we forcibly convert each
1259/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1260/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1261/// temporary item variant and does implement [`std::cmp::Ord`].
1262///
1263/// WARNING: Do not call outside of startup.
1264pub(crate) fn into_consolidatable_updates_startup(
1265    updates: Vec<StateUpdate>,
1266    ts: Timestamp,
1267) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1268    updates
1269        .into_iter()
1270        .map(|StateUpdate { kind, ts: _, diff }| {
1271            let kind: BootstrapStateUpdateKind = kind
1272                .try_into()
1273                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1274            (kind, ts, Diff::from(diff))
1275        })
1276        .collect()
1277}
1278
1279fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1280    defaults: &BTreeMap<String, String>,
1281    remote: Option<&BTreeMap<String, String>>,
1282    cfg: &mz_dyncfg::Config<T>,
1283) -> T::ConfigType {
1284    let mut val = T::into_config_type(cfg.default().clone());
1285    let get_fn = |map: &BTreeMap<String, String>| {
1286        let val = map.get(cfg.name())?;
1287        match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1288            Ok(x) => Some(x),
1289            Err(err) => {
1290                tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1291                None
1292            }
1293        }
1294    };
1295    if let Some(x) = get_fn(defaults) {
1296        val = x;
1297    }
1298    if let Some(x) = remote.and_then(get_fn) {
1299        val = x;
1300    }
1301    val
1302}