mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_item_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use futures::future::{BoxFuture, FutureExt};
19use itertools::{Either, Itertools};
20use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
21use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
25    Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
26};
27use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
28use mz_catalog::durable::objects::{
29    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
30};
31use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
32use mz_catalog::expr_cache::{
33    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
34};
35use mz_catalog::memory::error::{Error, ErrorKind};
36use mz_catalog::memory::objects::{
37    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, StateUpdate,
38};
39use mz_controller::clusters::{ReplicaAllocation, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_ore::cast::usize_to_u64;
42use mz_ore::collections::HashSet;
43use mz_ore::now::to_datetime;
44use mz_ore::{instrument, soft_assert_no_log};
45use mz_repr::adt::mz_acl_item::PrivilegeMap;
46use mz_repr::namespaces::is_unstable_schema;
47use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
48use mz_sql::catalog::{
49    BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
50};
51use mz_sql::func::OP_IMPLS;
52use mz_sql::names::CommentObjectId;
53use mz_sql::rbac;
54use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
55use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
56use mz_storage_client::storage_collections::StorageCollections;
57use timely::Container;
58use tracing::{Instrument, info, warn};
59use uuid::Uuid;
60
61// DO NOT add any more imports from `crate` outside of `crate::catalog`.
62use crate::AdapterError;
63use crate::catalog::open::builtin_item_migration::{
64    BuiltinItemMigrationResult, migrate_builtin_items,
65};
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68    BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name, migrate,
69};
70
71pub struct InitializeStateResult {
72    /// An initialized [`CatalogState`].
73    pub state: CatalogState,
74    /// A set of new shards that may need to be initialized (only used by 0dt migration).
75    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
76    /// A set of new builtin items.
77    pub new_builtin_collections: BTreeSet<GlobalId>,
78    /// A list of builtin table updates corresponding to the initialized state.
79    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
80    /// The version of the catalog that existed before initializing the catalog.
81    pub last_seen_version: String,
82    /// A handle to the expression cache if it's enabled.
83    pub expr_cache_handle: Option<ExpressionCacheHandle>,
84    /// The global expressions that were cached in `expr_cache_handle`.
85    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
86    /// The local expressions that were NOT cached in `expr_cache_handle`.
87    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
88}
89
90pub struct OpenCatalogResult {
91    /// An opened [`Catalog`].
92    pub catalog: Catalog,
93    /// A set of new shards that may need to be initialized.
94    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
95    /// A set of new builtin items.
96    pub new_builtin_collections: BTreeSet<GlobalId>,
97    /// A list of builtin table updates corresponding to the initialized state.
98    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
99    /// The global expressions that were cached in the expression cache.
100    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
101    /// The local expressions that were NOT cached in the expression cache.
102    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
103}
104
105impl Catalog {
106    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
107    /// external to a [mz_catalog::durable::DurableCatalogState]
108    /// (for example: no [mz_secrets::SecretsReader]).
109    pub async fn initialize_state<'a>(
110        config: StateConfig,
111        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
112    ) -> Result<InitializeStateResult, AdapterError> {
113        for builtin_role in BUILTIN_ROLES {
114            assert!(
115                is_reserved_name(builtin_role.name),
116                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
117                BUILTIN_PREFIXES.join(", ")
118            );
119        }
120        for builtin_cluster in BUILTIN_CLUSTERS {
121            assert!(
122                is_reserved_name(builtin_cluster.name),
123                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
124                BUILTIN_PREFIXES.join(", ")
125            );
126        }
127
128        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
129        if config.all_features {
130            system_configuration.enable_all_feature_flags_by_default();
131        }
132
133        let mut state = CatalogState {
134            database_by_name: BTreeMap::new(),
135            database_by_id: BTreeMap::new(),
136            entry_by_id: BTreeMap::new(),
137            entry_by_global_id: BTreeMap::new(),
138            ambient_schemas_by_name: BTreeMap::new(),
139            ambient_schemas_by_id: BTreeMap::new(),
140            clusters_by_name: BTreeMap::new(),
141            clusters_by_id: BTreeMap::new(),
142            roles_by_name: BTreeMap::new(),
143            roles_by_id: BTreeMap::new(),
144            network_policies_by_id: BTreeMap::new(),
145            role_auth_by_id: BTreeMap::new(),
146            network_policies_by_name: BTreeMap::new(),
147            system_configuration,
148            default_privileges: DefaultPrivileges::default(),
149            system_privileges: PrivilegeMap::default(),
150            comments: CommentsMap::default(),
151            source_references: BTreeMap::new(),
152            storage_metadata: Default::default(),
153            temporary_schemas: BTreeMap::new(),
154            config: mz_sql::catalog::CatalogConfig {
155                start_time: to_datetime((config.now)()),
156                start_instant: Instant::now(),
157                nonce: rand::random(),
158                environment_id: config.environment_id,
159                session_id: Uuid::new_v4(),
160                build_info: config.build_info,
161                timestamp_interval: Duration::from_secs(1),
162                now: config.now.clone(),
163                connection_context: config.connection_context,
164                builtins_cfg: BuiltinsConfig {
165                    // This will fall back to the default in code (false) if we timeout on the
166                    // initial LD sync. We're just using this to get some additional testing in on
167                    // CTs so a false negative is fine, we're only worried about false positives.
168                    include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
169                        &config.system_parameter_defaults,
170                        config.remote_system_parameters.as_ref(),
171                        &ENABLE_CONTINUAL_TASK_BUILTINS,
172                    ),
173                },
174                helm_chart_version: config.helm_chart_version,
175            },
176            cluster_replica_sizes: config.cluster_replica_sizes,
177            availability_zones: config.availability_zones,
178            egress_addresses: config.egress_addresses,
179            aws_principal_context: config.aws_principal_context,
180            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
181            http_host_name: config.http_host_name,
182        };
183
184        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
185        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
186        let mut txn = storage.transaction().await?;
187
188        // Migrate/update durable data before we start loading the in-memory catalog.
189        let (migrated_builtins, new_builtin_collections) = {
190            migrate::durable_migrate(
191                &mut txn,
192                state.config.environment_id.organization_id(),
193                config.boot_ts,
194            )?;
195            // Overwrite and persist selected parameter values in `remote_system_parameters` that
196            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
197            if let Some(remote_system_parameters) = config.remote_system_parameters {
198                for (name, value) in remote_system_parameters {
199                    txn.upsert_system_config(&name, value)?;
200                }
201                txn.set_system_config_synced_once()?;
202            }
203            // Add any new builtin objects and remove old ones.
204            let (migrated_builtins, new_builtin_collections) =
205                add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
206            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
207                system_cluster: config.builtin_system_cluster_config,
208                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
209                probe_cluster: config.builtin_probe_cluster_config,
210                support_cluster: config.builtin_support_cluster_config,
211                analytics_cluster: config.builtin_analytics_cluster_config,
212            };
213            add_new_remove_old_builtin_clusters_migration(
214                &mut txn,
215                &builtin_bootstrap_cluster_config_map,
216                &state.cluster_replica_sizes,
217            )?;
218            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
219            add_new_remove_old_builtin_cluster_replicas_migration(
220                &mut txn,
221                &builtin_bootstrap_cluster_config_map,
222                &state.cluster_replica_sizes,
223            )?;
224            add_new_remove_old_builtin_roles_migration(&mut txn)?;
225            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
226            (migrated_builtins, new_builtin_collections)
227        };
228        remove_pending_cluster_replicas_migration(&mut txn)?;
229
230        let op_updates = txn.get_and_commit_op_updates();
231        updates.extend(op_updates);
232
233        // Seed the in-memory catalog with values that don't come from the durable catalog.
234        {
235            // Set defaults from configuration passed in the provided `system_parameter_defaults`
236            // map.
237            for (name, value) in config.system_parameter_defaults {
238                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
239                    Ok(_) => (),
240                    Err(Error {
241                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
242                    }) => {
243                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
244                    }
245                    Err(e) => return Err(e.into()),
246                };
247            }
248            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
249        }
250
251        let mut builtin_table_updates = Vec::new();
252
253        // Make life easier by consolidating all updates, so that we end up with only positive
254        // diffs.
255        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
256        differential_dataflow::consolidation::consolidate_updates(&mut updates);
257        soft_assert_no_log!(
258            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
259            "consolidated updates should be positive during startup: {updates:?}"
260        );
261
262        let mut pre_item_updates = Vec::new();
263        let mut system_item_updates = Vec::new();
264        let mut item_updates = Vec::new();
265        let mut post_item_updates = Vec::new();
266        let mut audit_log_updates = Vec::new();
267        for (kind, ts, diff) in updates {
268            match kind {
269                BootstrapStateUpdateKind::Role(_)
270                | BootstrapStateUpdateKind::RoleAuth(_)
271                | BootstrapStateUpdateKind::Database(_)
272                | BootstrapStateUpdateKind::Schema(_)
273                | BootstrapStateUpdateKind::DefaultPrivilege(_)
274                | BootstrapStateUpdateKind::SystemPrivilege(_)
275                | BootstrapStateUpdateKind::SystemConfiguration(_)
276                | BootstrapStateUpdateKind::Cluster(_)
277                | BootstrapStateUpdateKind::NetworkPolicy(_)
278                | BootstrapStateUpdateKind::ClusterReplica(_) => {
279                    pre_item_updates.push(StateUpdate {
280                        kind: kind.into(),
281                        ts,
282                        diff: diff.try_into().expect("valid diff"),
283                    })
284                }
285                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
286                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
287                    system_item_updates.push(StateUpdate {
288                        kind: kind.into(),
289                        ts,
290                        diff: diff.try_into().expect("valid diff"),
291                    })
292                }
293                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
294                    kind: kind.into(),
295                    ts,
296                    diff: diff.try_into().expect("valid diff"),
297                }),
298                BootstrapStateUpdateKind::Comment(_)
299                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
300                | BootstrapStateUpdateKind::SourceReferences(_)
301                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
302                    post_item_updates.push((kind, ts, diff));
303                }
304                BootstrapStateUpdateKind::AuditLog(_) => {
305                    audit_log_updates.push(StateUpdate {
306                        kind: kind.into(),
307                        ts,
308                        diff: diff.try_into().expect("valid diff"),
309                    });
310                }
311            }
312        }
313
314        let builtin_table_update = state
315            .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
316            .await;
317        builtin_table_updates.extend(builtin_table_update);
318
319        let expr_cache_start = Instant::now();
320        info!("startup: coordinator init: catalog open: expr cache open beginning");
321        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
322        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
323        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
324        let expr_cache_enabled = config.enable_0dt_deployment
325            && config
326                .enable_expression_cache_override
327                .unwrap_or(enable_expr_cache_dyncfg);
328        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
329            info!(
330                ?config.enable_0dt_deployment,
331                ?config.enable_expression_cache_override,
332                ?enable_expr_cache_dyncfg,
333                "using expression cache for startup"
334            );
335            let current_ids = txn
336                .get_items()
337                .flat_map(|item| {
338                    let gid = item.global_id.clone();
339                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
340                    std::iter::once(gid).chain(gids.into_iter())
341                })
342                .chain(
343                    txn.get_system_object_mappings()
344                        .map(|som| som.unique_identifier.global_id),
345                )
346                .collect();
347            let dyncfgs = config.persist_client.dyncfgs().clone();
348            let build_version = if config.build_info.is_dev() {
349                // A single dev version can be used for many different builds, so we need to use
350                // the build version that is also enriched with build metadata.
351                config
352                    .build_info
353                    .semver_version_build()
354                    .expect("build ID is not available on your platform!")
355            } else {
356                config.build_info.semver_version()
357            };
358            let expr_cache_config = ExpressionCacheConfig {
359                build_version,
360                shard_id: txn
361                    .get_expression_cache_shard()
362                    .expect("expression cache shard should exist for opened catalogs"),
363                persist: config.persist_client,
364                current_ids,
365                remove_prior_versions: !config.read_only,
366                compact_shard: config.read_only,
367                dyncfgs,
368            };
369            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
370                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
371            (
372                Some(expr_cache_handle),
373                cached_local_exprs,
374                cached_global_exprs,
375            )
376        } else {
377            (None, BTreeMap::new(), BTreeMap::new())
378        };
379        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
380        info!(
381            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
382            expr_cache_start.elapsed()
383        );
384
385        let builtin_table_update = state
386            .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
387            .await;
388        builtin_table_updates.extend(builtin_table_update);
389
390        let last_seen_version = txn
391            .get_catalog_content_version()
392            .unwrap_or("new")
393            .to_string();
394
395        // Migrate item ASTs.
396        let builtin_table_update = if !config.skip_migrations {
397            let migrate_result = migrate::migrate(
398                &mut state,
399                &mut txn,
400                &mut local_expr_cache,
401                item_updates,
402                config.now,
403                config.boot_ts,
404            )
405            .await
406            .map_err(|e| {
407                Error::new(ErrorKind::FailedMigration {
408                    last_seen_version: last_seen_version.clone(),
409                    this_version: config.build_info.version,
410                    cause: e.to_string(),
411                })
412            })?;
413            if !migrate_result.post_item_updates.is_empty() {
414                // Include any post-item-updates generated by migrations, and then consolidate
415                // them to ensure diffs are all positive.
416                post_item_updates.extend(migrate_result.post_item_updates);
417                // Push everything to the same timestamp so it consolidates cleanly.
418                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
419                    for (_, ts, _) in &mut post_item_updates {
420                        *ts = max_ts;
421                    }
422                }
423                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
424            }
425
426            migrate_result.builtin_table_updates
427        } else {
428            state
429                .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
430                .await
431        };
432        builtin_table_updates.extend(builtin_table_update);
433
434        let post_item_updates = post_item_updates
435            .into_iter()
436            .map(|(kind, ts, diff)| StateUpdate {
437                kind: kind.into(),
438                ts,
439                diff: diff.try_into().expect("valid diff"),
440            })
441            .collect();
442        let builtin_table_update = state
443            .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
444            .await;
445        builtin_table_updates.extend(builtin_table_update);
446
447        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
448        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
449        // updates.
450        for audit_log_update in audit_log_updates {
451            builtin_table_updates.extend(
452                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
453            );
454        }
455
456        // Migrate builtin items.
457        let BuiltinItemMigrationResult {
458            builtin_table_updates: builtin_table_update,
459            migrated_storage_collections_0dt,
460            cleanup_action,
461        } = migrate_builtin_items(
462            &mut state,
463            &mut txn,
464            &mut local_expr_cache,
465            migrated_builtins,
466            config.builtin_item_migration_config,
467        )
468        .await?;
469        builtin_table_updates.extend(builtin_table_update);
470        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
471
472        txn.commit(config.boot_ts).await?;
473
474        cleanup_action.await;
475
476        Ok(InitializeStateResult {
477            state,
478            migrated_storage_collections_0dt,
479            new_builtin_collections: new_builtin_collections.into_iter().collect(),
480            builtin_table_updates,
481            last_seen_version,
482            expr_cache_handle,
483            cached_global_exprs,
484            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
485        })
486    }
487
488    /// Opens or creates a catalog that stores data at `path`.
489    ///
490    /// Returns the catalog, metadata about builtin objects that have changed
491    /// schemas since last restart, a list of updates to builtin tables that
492    /// describe the initial state of the catalog, and the version of the
493    /// catalog before any migrations were performed.
494    ///
495    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
496    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
497    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
498    #[instrument(name = "catalog::open")]
499    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
500        async move {
501            let mut storage = config.storage;
502
503            let InitializeStateResult {
504                state,
505                migrated_storage_collections_0dt,
506                new_builtin_collections,
507                mut builtin_table_updates,
508                last_seen_version: _,
509                expr_cache_handle,
510                cached_global_exprs,
511                uncached_local_exprs,
512            } =
513                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
514                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
515                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
516                Self::initialize_state(config.state, &mut storage)
517                    .instrument(tracing::info_span!("catalog::initialize_state"))
518                    .boxed()
519                    .await?;
520
521            let catalog = Catalog {
522                state,
523                plans: CatalogPlans::default(),
524                expr_cache_handle,
525                transient_revision: 1,
526                storage: Arc::new(tokio::sync::Mutex::new(storage)),
527            };
528
529            // Operators aren't stored in the catalog, but we would like them in
530            // introspection views.
531            for (op, func) in OP_IMPLS.iter() {
532                match func {
533                    mz_sql::func::Func::Scalar(impls) => {
534                        for imp in impls {
535                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
536                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
537                            ));
538                        }
539                    }
540                    _ => unreachable!("all operators must be scalar functions"),
541                }
542            }
543
544            for ip in &catalog.state.egress_addresses {
545                builtin_table_updates.push(
546                    catalog
547                        .state
548                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
549                );
550            }
551
552            catalog.storage().await.mark_bootstrap_complete();
553
554            Ok(OpenCatalogResult {
555                catalog,
556                migrated_storage_collections_0dt,
557                new_builtin_collections,
558                builtin_table_updates,
559                cached_global_exprs,
560                uncached_local_exprs,
561            })
562        }
563        .instrument(tracing::info_span!("catalog::open"))
564        .boxed()
565    }
566
567    /// Initializes STORAGE to understand all shards that `self` expects to
568    /// exist.
569    ///
570    /// Note that this must be done before creating/rendering collections
571    /// because the storage controller might not be aware of new system
572    /// collections created between versions.
573    async fn initialize_storage_state(
574        &mut self,
575        storage_collections: &Arc<
576            dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
577        >,
578    ) -> Result<(), mz_catalog::durable::CatalogError> {
579        let collections = self
580            .entries()
581            .filter(|entry| entry.item().is_storage_collection())
582            .flat_map(|entry| entry.global_ids())
583            .collect();
584
585        // Clone the state so that any errors that occur do not leak any
586        // transformations on error.
587        let mut state = self.state.clone();
588
589        let mut storage = self.storage().await;
590        let mut txn = storage.transaction().await?;
591
592        storage_collections
593            .initialize_state(&mut txn, collections)
594            .await
595            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
596
597        let updates = txn.get_and_commit_op_updates();
598        let builtin_updates = state.apply_updates(updates)?;
599        assert!(builtin_updates.is_empty());
600        let commit_ts = txn.upper();
601        txn.commit(commit_ts).await?;
602        drop(storage);
603
604        // Save updated state.
605        self.state = state;
606        Ok(())
607    }
608
609    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
610    /// so make it available and initialize the controller.
611    pub async fn initialize_controller(
612        &mut self,
613        config: mz_controller::ControllerConfig,
614        envd_epoch: core::num::NonZeroI64,
615        read_only: bool,
616    ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
617    {
618        let controller_start = Instant::now();
619        info!("startup: controller init: beginning");
620
621        let controller = {
622            let mut storage = self.storage().await;
623            let mut tx = storage.transaction().await?;
624            mz_controller::prepare_initialization(&mut tx)
625                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
626            let updates = tx.get_and_commit_op_updates();
627            assert!(
628                updates.is_empty(),
629                "initializing controller should not produce updates: {updates:?}"
630            );
631            let commit_ts = tx.upper();
632            tx.commit(commit_ts).await?;
633
634            let read_only_tx = storage.transaction().await?;
635
636            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
637        };
638
639        self.initialize_storage_state(&controller.storage_collections)
640            .await?;
641
642        info!(
643            "startup: controller init: complete in {:?}",
644            controller_start.elapsed()
645        );
646
647        Ok(controller)
648    }
649
650    /// Politely releases all external resources that can only be released in an async context.
651    pub async fn expire(self) {
652        // If no one else holds a reference to storage, then clean up the storage resources.
653        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
654        if let Some(storage) = Arc::into_inner(self.storage) {
655            let storage = storage.into_inner();
656            storage.expire().await;
657        }
658    }
659}
660
661impl CatalogState {
662    /// Set the default value for `name`, which is the value it will be reset to.
663    fn set_system_configuration_default(
664        &mut self,
665        name: &str,
666        value: VarInput,
667    ) -> Result<(), Error> {
668        Ok(self.system_configuration.set_default(name, value)?)
669    }
670}
671
672/// Updates the catalog with new and removed builtin items.
673///
674/// Returns the list of builtin [`GlobalId`]s that need to be migrated, and the list of new builtin
675/// [`GlobalId`]s.
676fn add_new_remove_old_builtin_items_migration(
677    builtins_cfg: &BuiltinsConfig,
678    txn: &mut mz_catalog::durable::Transaction<'_>,
679) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
680    let mut new_builtin_mappings = Vec::new();
681    let mut migrated_builtin_ids = Vec::new();
682    // Used to validate unique descriptions.
683    let mut builtin_descs = HashSet::new();
684
685    // We compare the builtin items that are compiled into the binary with the builtin items that
686    // are persisted in the catalog to discover new, deleted, and migrated builtin items.
687    let mut builtins = Vec::new();
688    for builtin in BUILTINS::iter(builtins_cfg) {
689        let desc = SystemObjectDescription {
690            schema_name: builtin.schema().to_string(),
691            object_type: builtin.catalog_item_type(),
692            object_name: builtin.name().to_string(),
693        };
694        // Validate that the description is unique.
695        if !builtin_descs.insert(desc.clone()) {
696            panic!(
697                "duplicate builtin description: {:?}, {:?}",
698                SystemObjectDescription {
699                    schema_name: builtin.schema().to_string(),
700                    object_type: builtin.catalog_item_type(),
701                    object_name: builtin.name().to_string(),
702                },
703                builtin
704            );
705        }
706        builtins.push((desc, builtin));
707    }
708
709    let mut system_object_mappings: BTreeMap<_, _> = txn
710        .get_system_object_mappings()
711        .map(|system_object_mapping| {
712            (
713                system_object_mapping.description.clone(),
714                system_object_mapping,
715            )
716        })
717        .collect();
718
719    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
720        builtins.into_iter().partition_map(|(desc, builtin)| {
721            let fingerprint = match builtin.runtime_alterable() {
722                false => builtin.fingerprint(),
723                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
724            };
725            match system_object_mappings.remove(&desc) {
726                Some(system_object_mapping) => {
727                    Either::Left((builtin, system_object_mapping, fingerprint))
728                }
729                None => Either::Right((builtin, fingerprint)),
730            }
731        });
732    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
733    let new_builtins: Vec<_> = new_builtins
734        .into_iter()
735        .zip(new_builtin_ids.clone())
736        .collect();
737
738    // Look for migrated builtins.
739    for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
740        if system_object_mapping.unique_identifier.fingerprint != fingerprint {
741            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly,
742            // it was cause the table to be truncated because the contents are not also
743            // stored in the durable catalog. Secondly, we prune `mz_storage_usage_by_shard`
744            // of old events in the background on startup. The correctness of that pruning
745            // relies on there being no other retractions to `mz_storage_usage_by_shard`.
746            assert_ne!(
747                *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
748                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
749            );
750            assert_ne!(
751                builtin.catalog_item_type(),
752                CatalogItemType::Type,
753                "types cannot be migrated"
754            );
755            assert_ne!(
756                system_object_mapping.unique_identifier.fingerprint,
757                RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
758                "clearing the runtime alterable flag on an existing object is not permitted",
759            );
760            assert!(
761                !builtin.runtime_alterable(),
762                "setting the runtime alterable flag on an existing object is not permitted"
763            );
764            migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
765        }
766    }
767
768    // Add new builtin items to catalog.
769    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
770        new_builtin_mappings.push(SystemObjectMapping {
771            description: SystemObjectDescription {
772                schema_name: builtin.schema().to_string(),
773                object_type: builtin.catalog_item_type(),
774                object_name: builtin.name().to_string(),
775            },
776            unique_identifier: SystemObjectUniqueIdentifier {
777                catalog_id,
778                global_id,
779                fingerprint,
780            },
781        });
782
783        // Runtime-alterable system objects are durably recorded to the
784        // usual items collection, so that they can be later altered at
785        // runtime by their owner (i.e., outside of the usual builtin
786        // migration framework that requires changes to the binary
787        // itself).
788        let handled_runtime_alterable = match builtin {
789            Builtin::Connection(c) if c.runtime_alterable => {
790                let mut acl_items = vec![rbac::owner_privilege(
791                    mz_sql::catalog::ObjectType::Connection,
792                    c.owner_id.clone(),
793                )];
794                acl_items.extend_from_slice(c.access);
795                // Builtin Connections cannot be versioned.
796                let versions = BTreeMap::new();
797
798                txn.insert_item(
799                    catalog_id,
800                    c.oid,
801                    global_id,
802                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
803                    c.name,
804                    c.sql.into(),
805                    *c.owner_id,
806                    acl_items,
807                    versions,
808                )?;
809                true
810            }
811            _ => false,
812        };
813        assert_eq!(
814            builtin.runtime_alterable(),
815            handled_runtime_alterable,
816            "runtime alterable object was not handled by migration",
817        );
818    }
819    txn.set_system_object_mappings(new_builtin_mappings)?;
820
821    // Update comments of all builtin objects
822    let builtins_with_catalog_ids = existing_builtins
823        .iter()
824        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
825        .chain(
826            new_builtins
827                .into_iter()
828                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
829        );
830
831    for (builtin, id) in builtins_with_catalog_ids {
832        let (comment_id, desc, comments) = match builtin {
833            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
834            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
835            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
836            Builtin::Log(_)
837            | Builtin::Type(_)
838            | Builtin::Func(_)
839            | Builtin::ContinualTask(_)
840            | Builtin::Index(_)
841            | Builtin::Connection(_) => continue,
842        };
843        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
844
845        let mut comments = comments.clone();
846        for (col_idx, name) in desc.iter_names().enumerate() {
847            if let Some(comment) = comments.remove(name.as_str()) {
848                // Comment column indices are 1 based
849                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
850            }
851        }
852        assert!(
853            comments.is_empty(),
854            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
855        );
856    }
857
858    // Anything left in `system_object_mappings` must have been deleted and should be removed from
859    // the catalog.
860    let mut deleted_system_objects = BTreeSet::new();
861    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
862    for (_, mapping) in system_object_mappings {
863        deleted_system_objects.insert(mapping.description);
864        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
865            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
866        }
867    }
868    // If you are 100% positive that it is safe to delete a system object outside any of the
869    // unstable schemas, then add it to this set. Make sure that no prod environments are
870    // using this object and that the upgrade checker does not show any issues.
871    //
872    // Objects can be removed from this set after one release.
873    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
874    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
875    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
876    // handle that scenario correctly.
877    assert!(
878        deleted_system_objects
879            .iter()
880            // It's okay if Indexes change because they're inherently ephemeral.
881            .filter(|object| object.object_type != CatalogItemType::Index)
882            .all(
883                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
884                    || delete_exceptions.contains(deleted_object)
885            ),
886        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
887        deleted_system_objects
888    );
889    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
890    txn.remove_system_object_mappings(deleted_system_objects)?;
891
892    // Filter down to just the GlobalIds which are used to track the underlying collections.
893    let new_builtin_collections = new_builtin_ids
894        .into_iter()
895        .map(|(_catalog_id, global_id)| global_id)
896        .collect();
897
898    Ok((migrated_builtin_ids, new_builtin_collections))
899}
900
901fn add_new_remove_old_builtin_clusters_migration(
902    txn: &mut mz_catalog::durable::Transaction<'_>,
903    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
904    cluster_sizes: &ClusterReplicaSizeMap,
905) -> Result<(), mz_catalog::durable::CatalogError> {
906    let mut durable_clusters: BTreeMap<_, _> = txn
907        .get_clusters()
908        .filter(|cluster| cluster.id.is_system())
909        .map(|cluster| (cluster.name.to_string(), cluster))
910        .collect();
911
912    // Add new clusters.
913    for builtin_cluster in BUILTIN_CLUSTERS {
914        if durable_clusters.remove(builtin_cluster.name).is_none() {
915            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
916            let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_config.size)?;
917
918            txn.insert_system_cluster(
919                builtin_cluster.name,
920                vec![],
921                builtin_cluster.privileges.to_vec(),
922                builtin_cluster.owner_id.to_owned(),
923                mz_catalog::durable::ClusterConfig {
924                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
925                        size: cluster_config.size,
926                        availability_zones: vec![],
927                        replication_factor: cluster_config.replication_factor,
928                        disk: cluster_allocation.is_cc,
929                        logging: default_logging_config(),
930                        optimizer_feature_overrides: Default::default(),
931                        schedule: Default::default(),
932                    }),
933                    workload_class: None,
934                },
935                &HashSet::new(),
936            )?;
937        }
938    }
939
940    // Remove old clusters.
941    let old_clusters = durable_clusters
942        .values()
943        .map(|cluster| cluster.id)
944        .collect();
945    txn.remove_clusters(&old_clusters)?;
946
947    Ok(())
948}
949
950fn add_new_remove_old_builtin_introspection_source_migration(
951    txn: &mut mz_catalog::durable::Transaction<'_>,
952) -> Result<(), AdapterError> {
953    let mut new_indexes = Vec::new();
954    let mut removed_indexes = BTreeSet::new();
955    for cluster in txn.get_clusters() {
956        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
957
958        let mut new_logs = Vec::new();
959
960        for log in BUILTINS::logs() {
961            if introspection_source_index_ids.remove(log.name).is_none() {
962                new_logs.push(log);
963            }
964        }
965
966        for log in new_logs {
967            let (item_id, gid) =
968                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
969            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
970        }
971
972        // Anything left in `introspection_source_index_ids` must have been deleted and should be
973        // removed from the catalog.
974        removed_indexes.extend(
975            introspection_source_index_ids
976                .into_keys()
977                .map(|name| (cluster.id, name.to_string())),
978        );
979    }
980    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
981    txn.remove_introspection_source_indexes(removed_indexes)?;
982    Ok(())
983}
984
985fn add_new_remove_old_builtin_roles_migration(
986    txn: &mut mz_catalog::durable::Transaction<'_>,
987) -> Result<(), mz_catalog::durable::CatalogError> {
988    let mut durable_roles: BTreeMap<_, _> = txn
989        .get_roles()
990        .filter(|role| role.id.is_system() || role.id.is_predefined())
991        .map(|role| (role.name.to_string(), role))
992        .collect();
993
994    // Add new roles.
995    for builtin_role in BUILTIN_ROLES {
996        if durable_roles.remove(builtin_role.name).is_none() {
997            txn.insert_builtin_role(
998                builtin_role.id,
999                builtin_role.name.to_string(),
1000                builtin_role.attributes.clone(),
1001                RoleMembership::new(),
1002                RoleVars::default(),
1003                builtin_role.oid,
1004            )?;
1005        }
1006    }
1007
1008    // Remove old roles.
1009    let old_roles = durable_roles.values().map(|role| role.id).collect();
1010    txn.remove_roles(&old_roles)?;
1011
1012    Ok(())
1013}
1014
1015fn add_new_remove_old_builtin_cluster_replicas_migration(
1016    txn: &mut Transaction<'_>,
1017    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1018    cluster_sizes: &ClusterReplicaSizeMap,
1019) -> Result<(), AdapterError> {
1020    let cluster_lookup: BTreeMap<_, _> = txn
1021        .get_clusters()
1022        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1023        .collect();
1024
1025    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1026        .get_cluster_replicas()
1027        .filter(|replica| replica.replica_id.is_system())
1028        .fold(BTreeMap::new(), |mut acc, replica| {
1029            acc.entry(replica.cluster_id)
1030                .or_insert_with(BTreeMap::new)
1031                .insert(replica.name.to_string(), replica);
1032            acc
1033        });
1034
1035    // Add new replicas.
1036    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1037        let cluster = cluster_lookup
1038            .get(builtin_replica.cluster_name)
1039            .expect("builtin cluster replica references non-existent cluster");
1040        // `empty_map` is a hack to simplify the if statement below.
1041        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1042        let replica_names = durable_replicas
1043            .get_mut(&cluster.id)
1044            .unwrap_or(&mut empty_map);
1045
1046        let builtin_cluster_boostrap_config =
1047            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1048        if replica_names.remove(builtin_replica.name).is_none()
1049            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1050            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1051            // factor is configurable on bootstrap.
1052            && builtin_cluster_boostrap_config.replication_factor > 0
1053        {
1054            let replica_size = match cluster.config.variant {
1055                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1056                ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
1057            };
1058            let replica_allocation = cluster_sizes.get_allocation_by_name(&replica_size)?;
1059
1060            let config = builtin_cluster_replica_config(replica_size, replica_allocation);
1061            txn.insert_cluster_replica(
1062                cluster.id,
1063                builtin_replica.name,
1064                config,
1065                MZ_SYSTEM_ROLE_ID,
1066            )?;
1067        }
1068    }
1069
1070    // Remove old replicas.
1071    let old_replicas = durable_replicas
1072        .values()
1073        .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1074        .collect();
1075    txn.remove_cluster_replicas(&old_replicas)?;
1076
1077    Ok(())
1078}
1079
1080/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1081/// the 'cluster' parameter.
1082///
1083/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1084/// input is no longer valid. For example if we remove a configuration parameter or change the
1085/// accepted set of values.
1086fn remove_invalid_config_param_role_defaults_migration(
1087    txn: &mut Transaction<'_>,
1088) -> Result<(), AdapterError> {
1089    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1090
1091    let roles_to_migrate: BTreeMap<_, _> = txn
1092        .get_roles()
1093        .filter_map(|mut role| {
1094            // Create an empty SessionVars just so we can check if a var is valid.
1095            //
1096            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1097            // session variables.
1098            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1099
1100            // Iterate over all of the variable defaults for this role.
1101            let mut invalid_roles_vars = BTreeMap::new();
1102            for (name, value) in &role.vars.map {
1103                // If one does not exist or its value is invalid, then mark it for removal.
1104                let Ok(session_var) = session_vars.inspect(name) else {
1105                    invalid_roles_vars.insert(name.clone(), value.clone());
1106                    continue;
1107                };
1108                if session_var.check(value.borrow()).is_err() {
1109                    invalid_roles_vars.insert(name.clone(), value.clone());
1110                }
1111            }
1112
1113            // If the role has no invalid values, nothing to do!
1114            if invalid_roles_vars.is_empty() {
1115                return None;
1116            }
1117
1118            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1119
1120            // Otherwise, remove the variables from the role and return it to be updated.
1121            for (name, _value) in invalid_roles_vars {
1122                role.vars.map.remove(&name);
1123            }
1124            Some(role)
1125        })
1126        .map(|role| (role.id, role))
1127        .collect();
1128
1129    txn.update_roles_without_auth(roles_to_migrate)?;
1130
1131    Ok(())
1132}
1133
1134/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1135/// are marked as pending and should be cleaned up on catalog opsn.
1136fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1137    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1138        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1139            replica.config.location
1140        {
1141            tx.remove_cluster_replica(replica.replica_id)?;
1142        }
1143    }
1144    Ok(())
1145}
1146
1147pub(crate) fn builtin_cluster_replica_config(
1148    replica_size: String,
1149    replica_allocation: &ReplicaAllocation,
1150) -> mz_catalog::durable::ReplicaConfig {
1151    mz_catalog::durable::ReplicaConfig {
1152        location: mz_catalog::durable::ReplicaLocation::Managed {
1153            availability_zone: None,
1154            billed_as: None,
1155            disk: replica_allocation.is_cc,
1156            pending: false,
1157            internal: false,
1158            size: replica_size,
1159        },
1160        logging: default_logging_config(),
1161    }
1162}
1163
1164fn default_logging_config() -> ReplicaLogging {
1165    ReplicaLogging {
1166        log_logging: false,
1167        interval: Some(Duration::from_secs(1)),
1168    }
1169}
1170
1171#[derive(Debug)]
1172pub struct BuiltinBootstrapClusterConfigMap {
1173    /// Size and replication factor to default system_cluster on bootstrap
1174    pub system_cluster: BootstrapBuiltinClusterConfig,
1175    /// Size and replication factor to default catalog_server_cluster on bootstrap
1176    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1177    /// Size and replication factor to default probe_cluster on bootstrap
1178    pub probe_cluster: BootstrapBuiltinClusterConfig,
1179    /// Size and replication factor to default support_cluster on bootstrap
1180    pub support_cluster: BootstrapBuiltinClusterConfig,
1181    /// Size to default analytics_cluster on bootstrap
1182    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1183}
1184
1185impl BuiltinBootstrapClusterConfigMap {
1186    /// Gets the size of the builtin cluster based on the provided name
1187    fn get_config(
1188        &self,
1189        cluster_name: &str,
1190    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1191        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1192            &self.system_cluster
1193        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1194            &self.catalog_server_cluster
1195        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1196            &self.probe_cluster
1197        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1198            &self.support_cluster
1199        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1200            &self.analytics_cluster
1201        } else {
1202            return Err(mz_catalog::durable::CatalogError::Catalog(
1203                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1204            ));
1205        };
1206        Ok(cluster_config.clone())
1207    }
1208}
1209
1210/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1211///
1212///   - Convert each update into a type that implements [`std::cmp::Ord`].
1213///   - Update the timestamp of each update to the same value.
1214///   - Convert the diff of each update to a type that implements
1215///     [`differential_dataflow::difference::Semigroup`].
1216///
1217/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1218/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1219/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1220/// and are only created after bootstrap is complete. So we forcibly convert each
1221/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1222/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1223/// temporary item variant and does implement [`std::cmp::Ord`].
1224///
1225/// WARNING: Do not call outside of startup.
1226pub(crate) fn into_consolidatable_updates_startup(
1227    updates: Vec<StateUpdate>,
1228    ts: Timestamp,
1229) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1230    updates
1231        .into_iter()
1232        .map(|StateUpdate { kind, ts: _, diff }| {
1233            let kind: BootstrapStateUpdateKind = kind
1234                .try_into()
1235                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1236            (kind, ts, Diff::from(diff))
1237        })
1238        .collect()
1239}
1240
1241fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1242    defaults: &BTreeMap<String, String>,
1243    remote: Option<&BTreeMap<String, String>>,
1244    cfg: &mz_dyncfg::Config<T>,
1245) -> T::ConfigType {
1246    let mut val = T::into_config_type(cfg.default().clone());
1247    let get_fn = |map: &BTreeMap<String, String>| {
1248        let val = map.get(cfg.name())?;
1249        match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1250            Ok(x) => Some(x),
1251            Err(err) => {
1252                tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1253                None
1254            }
1255        }
1256    };
1257    if let Some(x) = get_fn(defaults) {
1258        val = x;
1259    }
1260    if let Some(x) = remote.and_then(get_fn) {
1261        val = x;
1262    }
1263    val
1264}