mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_item_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use futures::future::{BoxFuture, FutureExt};
19use itertools::{Either, Itertools};
20use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
21use mz_adapter_types::dyncfgs::{
22    ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE, FORCE_SWAP_FOR_CC_SIZES,
23};
24use mz_auth::hash::scram256_hash;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
28    Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
29};
30use mz_catalog::config::StateConfig;
31use mz_catalog::durable::objects::{
32    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
33};
34use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
35use mz_catalog::expr_cache::{
36    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
37};
38use mz_catalog::memory::error::{Error, ErrorKind};
39use mz_catalog::memory::objects::{
40    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
41};
42use mz_controller::clusters::{ReplicaLocation, ReplicaLogging};
43use mz_controller_types::ClusterId;
44use mz_ore::cast::usize_to_u64;
45use mz_ore::collections::HashSet;
46use mz_ore::now::{SYSTEM_TIME, to_datetime};
47use mz_ore::{instrument, soft_assert_no_log};
48use mz_repr::adt::mz_acl_item::PrivilegeMap;
49use mz_repr::namespaces::is_unstable_schema;
50use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
51use mz_sql::catalog::{
52    BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
53};
54use mz_sql::func::OP_IMPLS;
55use mz_sql::names::CommentObjectId;
56use mz_sql::rbac;
57use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
58use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
59use mz_storage_client::storage_collections::StorageCollections;
60use timely::Container;
61use tracing::{Instrument, info, warn};
62use uuid::Uuid;
63
64// DO NOT add any more imports from `crate` outside of `crate::catalog`.
65use crate::AdapterError;
66use crate::catalog::open::builtin_item_migration::{
67    BuiltinItemMigrationResult, migrate_builtin_items,
68};
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71    BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name, migrate,
72};
73
74pub struct InitializeStateResult {
75    /// An initialized [`CatalogState`].
76    pub state: CatalogState,
77    /// A set of new shards that may need to be initialized (only used by 0dt migration).
78    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
79    /// A set of new builtin items.
80    pub new_builtin_collections: BTreeSet<GlobalId>,
81    /// A list of builtin table updates corresponding to the initialized state.
82    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
83    /// The version of the catalog that existed before initializing the catalog.
84    pub last_seen_version: String,
85    /// A handle to the expression cache if it's enabled.
86    pub expr_cache_handle: Option<ExpressionCacheHandle>,
87    /// The global expressions that were cached in `expr_cache_handle`.
88    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
89    /// The local expressions that were NOT cached in `expr_cache_handle`.
90    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
91}
92
93pub struct OpenCatalogResult {
94    /// An opened [`Catalog`].
95    pub catalog: Catalog,
96    /// A set of new shards that may need to be initialized.
97    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
98    /// A set of new builtin items.
99    pub new_builtin_collections: BTreeSet<GlobalId>,
100    /// A list of builtin table updates corresponding to the initialized state.
101    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
102    /// The global expressions that were cached in the expression cache.
103    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
104    /// The local expressions that were NOT cached in the expression cache.
105    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
106}
107
108impl Catalog {
109    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
110    /// external to a [mz_catalog::durable::DurableCatalogState]
111    /// (for example: no [mz_secrets::SecretsReader]).
112    pub async fn initialize_state<'a>(
113        config: StateConfig,
114        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
115    ) -> Result<InitializeStateResult, AdapterError> {
116        for builtin_role in BUILTIN_ROLES {
117            assert!(
118                is_reserved_name(builtin_role.name),
119                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
120                BUILTIN_PREFIXES.join(", ")
121            );
122        }
123        for builtin_cluster in BUILTIN_CLUSTERS {
124            assert!(
125                is_reserved_name(builtin_cluster.name),
126                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
127                BUILTIN_PREFIXES.join(", ")
128            );
129        }
130
131        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
132        if config.all_features {
133            system_configuration.enable_all_feature_flags_by_default();
134        }
135
136        let mut state = CatalogState {
137            database_by_name: BTreeMap::new(),
138            database_by_id: BTreeMap::new(),
139            entry_by_id: BTreeMap::new(),
140            entry_by_global_id: BTreeMap::new(),
141            ambient_schemas_by_name: BTreeMap::new(),
142            ambient_schemas_by_id: BTreeMap::new(),
143            clusters_by_name: BTreeMap::new(),
144            clusters_by_id: BTreeMap::new(),
145            roles_by_name: BTreeMap::new(),
146            roles_by_id: BTreeMap::new(),
147            network_policies_by_id: BTreeMap::new(),
148            role_auth_by_id: BTreeMap::new(),
149            network_policies_by_name: BTreeMap::new(),
150            system_configuration,
151            default_privileges: DefaultPrivileges::default(),
152            system_privileges: PrivilegeMap::default(),
153            comments: CommentsMap::default(),
154            source_references: BTreeMap::new(),
155            storage_metadata: Default::default(),
156            temporary_schemas: BTreeMap::new(),
157            config: mz_sql::catalog::CatalogConfig {
158                start_time: to_datetime((config.now)()),
159                start_instant: Instant::now(),
160                nonce: rand::random(),
161                environment_id: config.environment_id,
162                session_id: Uuid::new_v4(),
163                build_info: config.build_info,
164                timestamp_interval: Duration::from_secs(1),
165                now: config.now.clone(),
166                connection_context: config.connection_context,
167                builtins_cfg: BuiltinsConfig {
168                    // This will fall back to the default in code (false) if we timeout on the
169                    // initial LD sync. We're just using this to get some additional testing in on
170                    // CTs so a false negative is fine, we're only worried about false positives.
171                    include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
172                        &config.system_parameter_defaults,
173                        config.remote_system_parameters.as_ref(),
174                        &ENABLE_CONTINUAL_TASK_BUILTINS,
175                    ),
176                },
177                helm_chart_version: config.helm_chart_version,
178            },
179            cluster_replica_sizes: config.cluster_replica_sizes,
180            availability_zones: config.availability_zones,
181            egress_addresses: config.egress_addresses,
182            aws_principal_context: config.aws_principal_context,
183            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
184            http_host_name: config.http_host_name,
185            license_key: config.license_key,
186        };
187
188        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
189        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
190        let mut txn = storage.transaction().await?;
191
192        // Migrate/update durable data before we start loading the in-memory catalog.
193        let (migrated_builtins, new_builtin_collections) = {
194            migrate::durable_migrate(
195                &mut txn,
196                state.config.environment_id.organization_id(),
197                config.boot_ts,
198            )?;
199            // Overwrite and persist selected parameter values in `remote_system_parameters` that
200            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
201            if let Some(remote_system_parameters) = config.remote_system_parameters {
202                for (name, value) in remote_system_parameters {
203                    txn.upsert_system_config(&name, value)?;
204                }
205                txn.set_system_config_synced_once()?;
206            }
207            // Add any new builtin objects and remove old ones.
208            let (migrated_builtins, new_builtin_collections) =
209                add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
210            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
211                system_cluster: config.builtin_system_cluster_config,
212                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
213                probe_cluster: config.builtin_probe_cluster_config,
214                support_cluster: config.builtin_support_cluster_config,
215                analytics_cluster: config.builtin_analytics_cluster_config,
216            };
217            add_new_remove_old_builtin_clusters_migration(
218                &mut txn,
219                &builtin_bootstrap_cluster_config_map,
220            )?;
221            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
222            add_new_remove_old_builtin_cluster_replicas_migration(
223                &mut txn,
224                &builtin_bootstrap_cluster_config_map,
225            )?;
226            add_new_remove_old_builtin_roles_migration(&mut txn)?;
227            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
228            (migrated_builtins, new_builtin_collections)
229        };
230        remove_pending_cluster_replicas_migration(&mut txn)?;
231
232        let op_updates = txn.get_and_commit_op_updates();
233        updates.extend(op_updates);
234
235        // Seed the in-memory catalog with values that don't come from the durable catalog.
236        {
237            // Set defaults from configuration passed in the provided `system_parameter_defaults`
238            // map.
239            for (name, value) in config.system_parameter_defaults {
240                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
241                    Ok(_) => (),
242                    Err(Error {
243                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
244                    }) => {
245                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
246                    }
247                    Err(e) => return Err(e.into()),
248                };
249            }
250            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
251            if let Some(password) = config.external_login_password_mz_system {
252                state.role_auth_by_id.insert(
253                    MZ_SYSTEM_ROLE_ID,
254                    RoleAuth {
255                        role_id: MZ_SYSTEM_ROLE_ID,
256                        password_hash: Some(scram256_hash(&password).map_err(|_| {
257                            AdapterError::Internal("Failed to hash mz_system password.".to_owned())
258                        })?),
259                        updated_at: SYSTEM_TIME(),
260                    },
261                );
262            }
263        }
264
265        let mut builtin_table_updates = Vec::new();
266
267        // Make life easier by consolidating all updates, so that we end up with only positive
268        // diffs.
269        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
270        differential_dataflow::consolidation::consolidate_updates(&mut updates);
271        soft_assert_no_log!(
272            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
273            "consolidated updates should be positive during startup: {updates:?}"
274        );
275
276        let mut pre_item_updates = Vec::new();
277        let mut system_item_updates = Vec::new();
278        let mut item_updates = Vec::new();
279        let mut post_item_updates = Vec::new();
280        let mut audit_log_updates = Vec::new();
281        for (kind, ts, diff) in updates {
282            match kind {
283                BootstrapStateUpdateKind::Role(_)
284                | BootstrapStateUpdateKind::RoleAuth(_)
285                | BootstrapStateUpdateKind::Database(_)
286                | BootstrapStateUpdateKind::Schema(_)
287                | BootstrapStateUpdateKind::DefaultPrivilege(_)
288                | BootstrapStateUpdateKind::SystemPrivilege(_)
289                | BootstrapStateUpdateKind::SystemConfiguration(_)
290                | BootstrapStateUpdateKind::Cluster(_)
291                | BootstrapStateUpdateKind::NetworkPolicy(_)
292                | BootstrapStateUpdateKind::ClusterReplica(_) => {
293                    pre_item_updates.push(StateUpdate {
294                        kind: kind.into(),
295                        ts,
296                        diff: diff.try_into().expect("valid diff"),
297                    })
298                }
299                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
300                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
301                    system_item_updates.push(StateUpdate {
302                        kind: kind.into(),
303                        ts,
304                        diff: diff.try_into().expect("valid diff"),
305                    })
306                }
307                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
308                    kind: kind.into(),
309                    ts,
310                    diff: diff.try_into().expect("valid diff"),
311                }),
312                BootstrapStateUpdateKind::Comment(_)
313                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
314                | BootstrapStateUpdateKind::SourceReferences(_)
315                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
316                    post_item_updates.push((kind, ts, diff));
317                }
318                BootstrapStateUpdateKind::AuditLog(_) => {
319                    audit_log_updates.push(StateUpdate {
320                        kind: kind.into(),
321                        ts,
322                        diff: diff.try_into().expect("valid diff"),
323                    });
324                }
325            }
326        }
327
328        let builtin_table_update = state
329            .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
330            .await;
331        builtin_table_updates.extend(builtin_table_update);
332
333        let expr_cache_start = Instant::now();
334        info!("startup: coordinator init: catalog open: expr cache open beginning");
335        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
336        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
337        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
338        let expr_cache_enabled = config.enable_0dt_deployment
339            && config
340                .enable_expression_cache_override
341                .unwrap_or(enable_expr_cache_dyncfg);
342        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
343            info!(
344                ?config.enable_0dt_deployment,
345                ?config.enable_expression_cache_override,
346                ?enable_expr_cache_dyncfg,
347                "using expression cache for startup"
348            );
349            let current_ids = txn
350                .get_items()
351                .flat_map(|item| {
352                    let gid = item.global_id.clone();
353                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
354                    std::iter::once(gid).chain(gids.into_iter())
355                })
356                .chain(
357                    txn.get_system_object_mappings()
358                        .map(|som| som.unique_identifier.global_id),
359                )
360                .collect();
361            let dyncfgs = config.persist_client.dyncfgs().clone();
362            let build_version = if config.build_info.is_dev() {
363                // A single dev version can be used for many different builds, so we need to use
364                // the build version that is also enriched with build metadata.
365                config
366                    .build_info
367                    .semver_version_build()
368                    .expect("build ID is not available on your platform!")
369            } else {
370                config.build_info.semver_version()
371            };
372            let expr_cache_config = ExpressionCacheConfig {
373                build_version,
374                shard_id: txn
375                    .get_expression_cache_shard()
376                    .expect("expression cache shard should exist for opened catalogs"),
377                persist: config.persist_client,
378                current_ids,
379                remove_prior_versions: !config.read_only,
380                compact_shard: config.read_only,
381                dyncfgs,
382            };
383            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
384                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
385            (
386                Some(expr_cache_handle),
387                cached_local_exprs,
388                cached_global_exprs,
389            )
390        } else {
391            (None, BTreeMap::new(), BTreeMap::new())
392        };
393        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
394        info!(
395            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
396            expr_cache_start.elapsed()
397        );
398
399        let builtin_table_update = state
400            .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
401            .await;
402        builtin_table_updates.extend(builtin_table_update);
403
404        let last_seen_version = txn
405            .get_catalog_content_version()
406            .unwrap_or("new")
407            .to_string();
408
409        // Migrate item ASTs.
410        let builtin_table_update = if !config.skip_migrations {
411            let migrate_result = migrate::migrate(
412                &mut state,
413                &mut txn,
414                &mut local_expr_cache,
415                item_updates,
416                config.now,
417                config.boot_ts,
418            )
419            .await
420            .map_err(|e| {
421                Error::new(ErrorKind::FailedMigration {
422                    last_seen_version: last_seen_version.clone(),
423                    this_version: config.build_info.version,
424                    cause: e.to_string(),
425                })
426            })?;
427            if !migrate_result.post_item_updates.is_empty() {
428                // Include any post-item-updates generated by migrations, and then consolidate
429                // them to ensure diffs are all positive.
430                post_item_updates.extend(migrate_result.post_item_updates);
431                // Push everything to the same timestamp so it consolidates cleanly.
432                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
433                    for (_, ts, _) in &mut post_item_updates {
434                        *ts = max_ts;
435                    }
436                }
437                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
438            }
439
440            migrate_result.builtin_table_updates
441        } else {
442            state
443                .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
444                .await
445        };
446        builtin_table_updates.extend(builtin_table_update);
447
448        let post_item_updates = post_item_updates
449            .into_iter()
450            .map(|(kind, ts, diff)| StateUpdate {
451                kind: kind.into(),
452                ts,
453                diff: diff.try_into().expect("valid diff"),
454            })
455            .collect();
456        let builtin_table_update = state
457            .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
458            .await;
459        builtin_table_updates.extend(builtin_table_update);
460
461        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
462        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
463        // updates.
464        for audit_log_update in audit_log_updates {
465            builtin_table_updates.extend(
466                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
467            );
468        }
469
470        // Migrate builtin items.
471        let BuiltinItemMigrationResult {
472            builtin_table_updates: builtin_table_update,
473            migrated_storage_collections_0dt,
474            cleanup_action,
475        } = migrate_builtin_items(
476            &mut state,
477            &mut txn,
478            &mut local_expr_cache,
479            migrated_builtins,
480            config.builtin_item_migration_config,
481        )
482        .await?;
483        builtin_table_updates.extend(builtin_table_update);
484        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
485
486        txn.commit(config.boot_ts).await?;
487
488        cleanup_action.await;
489
490        // Force cc sizes to use swap, if requested.
491        if FORCE_SWAP_FOR_CC_SIZES.get(state.system_configuration.dyncfgs()) {
492            info!("force-enabling swap for cc replica sizes");
493
494            for size in state.cluster_replica_sizes.0.values_mut() {
495                if size.is_cc {
496                    size.swap_enabled = true;
497                    size.cpu_exclusive = false;
498                    size.selectors.remove("materialize.cloud/scratch-fs");
499                    size.selectors
500                        .insert("materialize.cloud/swap".into(), "true".into());
501                }
502            }
503
504            // The size definitions were copied into the in-memory replica configs during
505            // `CatalogState` initialization above. We can't move this code before that because we
506            // need to wait for the `system_configuration` to be fully initialized. Instead we also
507            // patch the replica configs here.
508            for cluster in state.clusters_by_id.values_mut() {
509                for replica in cluster.replicas_by_id_.values_mut() {
510                    if let ReplicaLocation::Managed(loc) = &mut replica.config.location {
511                        let alloc = &mut loc.allocation;
512                        if alloc.is_cc {
513                            alloc.swap_enabled = true;
514                            alloc.cpu_exclusive = false;
515                            alloc.selectors.remove("materialize.cloud/scratch-fs");
516                            alloc
517                                .selectors
518                                .insert("materialize.cloud/swap".into(), "true".into());
519                        }
520                    }
521                }
522            }
523        }
524
525        Ok(InitializeStateResult {
526            state,
527            migrated_storage_collections_0dt,
528            new_builtin_collections: new_builtin_collections.into_iter().collect(),
529            builtin_table_updates,
530            last_seen_version,
531            expr_cache_handle,
532            cached_global_exprs,
533            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
534        })
535    }
536
537    /// Opens or creates a catalog that stores data at `path`.
538    ///
539    /// Returns the catalog, metadata about builtin objects that have changed
540    /// schemas since last restart, a list of updates to builtin tables that
541    /// describe the initial state of the catalog, and the version of the
542    /// catalog before any migrations were performed.
543    ///
544    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
545    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
546    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
547    #[instrument(name = "catalog::open")]
548    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
549        async move {
550            let mut storage = config.storage;
551
552            let InitializeStateResult {
553                state,
554                migrated_storage_collections_0dt,
555                new_builtin_collections,
556                mut builtin_table_updates,
557                last_seen_version: _,
558                expr_cache_handle,
559                cached_global_exprs,
560                uncached_local_exprs,
561            } =
562                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
563                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
564                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
565                Self::initialize_state(config.state, &mut storage)
566                    .instrument(tracing::info_span!("catalog::initialize_state"))
567                    .boxed()
568                    .await?;
569
570            let catalog = Catalog {
571                state,
572                plans: CatalogPlans::default(),
573                expr_cache_handle,
574                transient_revision: 1,
575                storage: Arc::new(tokio::sync::Mutex::new(storage)),
576            };
577
578            // Operators aren't stored in the catalog, but we would like them in
579            // introspection views.
580            for (op, func) in OP_IMPLS.iter() {
581                match func {
582                    mz_sql::func::Func::Scalar(impls) => {
583                        for imp in impls {
584                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
585                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
586                            ));
587                        }
588                    }
589                    _ => unreachable!("all operators must be scalar functions"),
590                }
591            }
592
593            for ip in &catalog.state.egress_addresses {
594                builtin_table_updates.push(
595                    catalog
596                        .state
597                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
598                );
599            }
600
601            if !catalog.state.license_key.id.is_empty() {
602                builtin_table_updates.push(
603                    catalog.state.resolve_builtin_table_update(
604                        catalog
605                            .state
606                            .pack_license_key_update(&catalog.state.license_key)?,
607                    ),
608                );
609            }
610
611            catalog.storage().await.mark_bootstrap_complete();
612
613            Ok(OpenCatalogResult {
614                catalog,
615                migrated_storage_collections_0dt,
616                new_builtin_collections,
617                builtin_table_updates,
618                cached_global_exprs,
619                uncached_local_exprs,
620            })
621        }
622        .instrument(tracing::info_span!("catalog::open"))
623        .boxed()
624    }
625
626    /// Initializes STORAGE to understand all shards that `self` expects to
627    /// exist.
628    ///
629    /// Note that this must be done before creating/rendering collections
630    /// because the storage controller might not be aware of new system
631    /// collections created between versions.
632    async fn initialize_storage_state(
633        &mut self,
634        storage_collections: &Arc<
635            dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
636        >,
637    ) -> Result<(), mz_catalog::durable::CatalogError> {
638        let collections = self
639            .entries()
640            .filter(|entry| entry.item().is_storage_collection())
641            .flat_map(|entry| entry.global_ids())
642            .collect();
643
644        // Clone the state so that any errors that occur do not leak any
645        // transformations on error.
646        let mut state = self.state.clone();
647
648        let mut storage = self.storage().await;
649        let mut txn = storage.transaction().await?;
650
651        storage_collections
652            .initialize_state(&mut txn, collections)
653            .await
654            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
655
656        let updates = txn.get_and_commit_op_updates();
657        let builtin_updates = state.apply_updates(updates)?;
658        assert!(builtin_updates.is_empty());
659        let commit_ts = txn.upper();
660        txn.commit(commit_ts).await?;
661        drop(storage);
662
663        // Save updated state.
664        self.state = state;
665        Ok(())
666    }
667
668    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
669    /// so make it available and initialize the controller.
670    pub async fn initialize_controller(
671        &mut self,
672        config: mz_controller::ControllerConfig,
673        envd_epoch: core::num::NonZeroI64,
674        read_only: bool,
675    ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
676    {
677        let controller_start = Instant::now();
678        info!("startup: controller init: beginning");
679
680        let controller = {
681            let mut storage = self.storage().await;
682            let mut tx = storage.transaction().await?;
683            mz_controller::prepare_initialization(&mut tx)
684                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
685            let updates = tx.get_and_commit_op_updates();
686            assert!(
687                updates.is_empty(),
688                "initializing controller should not produce updates: {updates:?}"
689            );
690            let commit_ts = tx.upper();
691            tx.commit(commit_ts).await?;
692
693            let read_only_tx = storage.transaction().await?;
694
695            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
696        };
697
698        self.initialize_storage_state(&controller.storage_collections)
699            .await?;
700
701        info!(
702            "startup: controller init: complete in {:?}",
703            controller_start.elapsed()
704        );
705
706        Ok(controller)
707    }
708
709    /// Politely releases all external resources that can only be released in an async context.
710    pub async fn expire(self) {
711        // If no one else holds a reference to storage, then clean up the storage resources.
712        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
713        if let Some(storage) = Arc::into_inner(self.storage) {
714            let storage = storage.into_inner();
715            storage.expire().await;
716        }
717    }
718}
719
720impl CatalogState {
721    /// Set the default value for `name`, which is the value it will be reset to.
722    fn set_system_configuration_default(
723        &mut self,
724        name: &str,
725        value: VarInput,
726    ) -> Result<(), Error> {
727        Ok(self.system_configuration.set_default(name, value)?)
728    }
729}
730
731/// Updates the catalog with new and removed builtin items.
732///
733/// Returns the list of builtin [`GlobalId`]s that need to be migrated, and the list of new builtin
734/// [`GlobalId`]s.
735fn add_new_remove_old_builtin_items_migration(
736    builtins_cfg: &BuiltinsConfig,
737    txn: &mut mz_catalog::durable::Transaction<'_>,
738) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
739    let mut new_builtin_mappings = Vec::new();
740    let mut migrated_builtin_ids = Vec::new();
741    // Used to validate unique descriptions.
742    let mut builtin_descs = HashSet::new();
743
744    // We compare the builtin items that are compiled into the binary with the builtin items that
745    // are persisted in the catalog to discover new, deleted, and migrated builtin items.
746    let mut builtins = Vec::new();
747    for builtin in BUILTINS::iter(builtins_cfg) {
748        let desc = SystemObjectDescription {
749            schema_name: builtin.schema().to_string(),
750            object_type: builtin.catalog_item_type(),
751            object_name: builtin.name().to_string(),
752        };
753        // Validate that the description is unique.
754        if !builtin_descs.insert(desc.clone()) {
755            panic!(
756                "duplicate builtin description: {:?}, {:?}",
757                SystemObjectDescription {
758                    schema_name: builtin.schema().to_string(),
759                    object_type: builtin.catalog_item_type(),
760                    object_name: builtin.name().to_string(),
761                },
762                builtin
763            );
764        }
765        builtins.push((desc, builtin));
766    }
767
768    let mut system_object_mappings: BTreeMap<_, _> = txn
769        .get_system_object_mappings()
770        .map(|system_object_mapping| {
771            (
772                system_object_mapping.description.clone(),
773                system_object_mapping,
774            )
775        })
776        .collect();
777
778    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
779        builtins.into_iter().partition_map(|(desc, builtin)| {
780            let fingerprint = match builtin.runtime_alterable() {
781                false => builtin.fingerprint(),
782                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
783            };
784            match system_object_mappings.remove(&desc) {
785                Some(system_object_mapping) => {
786                    Either::Left((builtin, system_object_mapping, fingerprint))
787                }
788                None => Either::Right((builtin, fingerprint)),
789            }
790        });
791    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
792    let new_builtins: Vec<_> = new_builtins
793        .into_iter()
794        .zip(new_builtin_ids.clone())
795        .collect();
796
797    // Look for migrated builtins.
798    for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
799        if system_object_mapping.unique_identifier.fingerprint != fingerprint {
800            // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly,
801            // it was cause the table to be truncated because the contents are not also
802            // stored in the durable catalog. Secondly, we prune `mz_storage_usage_by_shard`
803            // of old events in the background on startup. The correctness of that pruning
804            // relies on there being no other retractions to `mz_storage_usage_by_shard`.
805            assert_ne!(
806                *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
807                "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
808            );
809            assert_ne!(
810                builtin.catalog_item_type(),
811                CatalogItemType::Type,
812                "types cannot be migrated"
813            );
814            assert_ne!(
815                system_object_mapping.unique_identifier.fingerprint,
816                RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
817                "clearing the runtime alterable flag on an existing object is not permitted",
818            );
819            assert!(
820                !builtin.runtime_alterable(),
821                "setting the runtime alterable flag on an existing object is not permitted"
822            );
823            migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
824        }
825    }
826
827    // Add new builtin items to catalog.
828    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
829        new_builtin_mappings.push(SystemObjectMapping {
830            description: SystemObjectDescription {
831                schema_name: builtin.schema().to_string(),
832                object_type: builtin.catalog_item_type(),
833                object_name: builtin.name().to_string(),
834            },
835            unique_identifier: SystemObjectUniqueIdentifier {
836                catalog_id,
837                global_id,
838                fingerprint,
839            },
840        });
841
842        // Runtime-alterable system objects are durably recorded to the
843        // usual items collection, so that they can be later altered at
844        // runtime by their owner (i.e., outside of the usual builtin
845        // migration framework that requires changes to the binary
846        // itself).
847        let handled_runtime_alterable = match builtin {
848            Builtin::Connection(c) if c.runtime_alterable => {
849                let mut acl_items = vec![rbac::owner_privilege(
850                    mz_sql::catalog::ObjectType::Connection,
851                    c.owner_id.clone(),
852                )];
853                acl_items.extend_from_slice(c.access);
854                // Builtin Connections cannot be versioned.
855                let versions = BTreeMap::new();
856
857                txn.insert_item(
858                    catalog_id,
859                    c.oid,
860                    global_id,
861                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
862                    c.name,
863                    c.sql.into(),
864                    *c.owner_id,
865                    acl_items,
866                    versions,
867                )?;
868                true
869            }
870            _ => false,
871        };
872        assert_eq!(
873            builtin.runtime_alterable(),
874            handled_runtime_alterable,
875            "runtime alterable object was not handled by migration",
876        );
877    }
878    txn.set_system_object_mappings(new_builtin_mappings)?;
879
880    // Update comments of all builtin objects
881    let builtins_with_catalog_ids = existing_builtins
882        .iter()
883        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
884        .chain(
885            new_builtins
886                .into_iter()
887                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
888        );
889
890    for (builtin, id) in builtins_with_catalog_ids {
891        let (comment_id, desc, comments) = match builtin {
892            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
893            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
894            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
895            Builtin::Log(_)
896            | Builtin::Type(_)
897            | Builtin::Func(_)
898            | Builtin::ContinualTask(_)
899            | Builtin::Index(_)
900            | Builtin::Connection(_) => continue,
901        };
902        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
903
904        let mut comments = comments.clone();
905        for (col_idx, name) in desc.iter_names().enumerate() {
906            if let Some(comment) = comments.remove(name.as_str()) {
907                // Comment column indices are 1 based
908                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
909            }
910        }
911        assert!(
912            comments.is_empty(),
913            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
914        );
915    }
916
917    // Anything left in `system_object_mappings` must have been deleted and should be removed from
918    // the catalog.
919    let mut deleted_system_objects = BTreeSet::new();
920    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
921    let mut deleted_comments = BTreeSet::new();
922    for (desc, mapping) in system_object_mappings {
923        deleted_system_objects.insert(mapping.description);
924        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
925            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
926        }
927
928        let id = mapping.unique_identifier.catalog_id;
929        let comment_id = match desc.object_type {
930            CatalogItemType::Table => CommentObjectId::Table(id),
931            CatalogItemType::Source => CommentObjectId::Source(id),
932            CatalogItemType::View => CommentObjectId::View(id),
933            CatalogItemType::Sink
934            | CatalogItemType::MaterializedView
935            | CatalogItemType::Index
936            | CatalogItemType::Type
937            | CatalogItemType::Func
938            | CatalogItemType::Secret
939            | CatalogItemType::Connection
940            | CatalogItemType::ContinualTask => continue,
941        };
942        deleted_comments.insert(comment_id);
943    }
944    // If you are 100% positive that it is safe to delete a system object outside any of the
945    // unstable schemas, then add it to this set. Make sure that no prod environments are
946    // using this object and that the upgrade checker does not show any issues.
947    //
948    // Objects can be removed from this set after one release.
949    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
950    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
951    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
952    // handle that scenario correctly.
953    assert!(
954        deleted_system_objects
955            .iter()
956            // It's okay if Indexes change because they're inherently ephemeral.
957            .filter(|object| object.object_type != CatalogItemType::Index)
958            .all(
959                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
960                    || delete_exceptions.contains(deleted_object)
961            ),
962        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
963        deleted_system_objects
964    );
965    txn.drop_comments(&deleted_comments)?;
966    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
967    txn.remove_system_object_mappings(deleted_system_objects)?;
968
969    // Filter down to just the GlobalIds which are used to track the underlying collections.
970    let new_builtin_collections = new_builtin_ids
971        .into_iter()
972        .map(|(_catalog_id, global_id)| global_id)
973        .collect();
974
975    Ok((migrated_builtin_ids, new_builtin_collections))
976}
977
978fn add_new_remove_old_builtin_clusters_migration(
979    txn: &mut mz_catalog::durable::Transaction<'_>,
980    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
981) -> Result<(), mz_catalog::durable::CatalogError> {
982    let mut durable_clusters: BTreeMap<_, _> = txn
983        .get_clusters()
984        .filter(|cluster| cluster.id.is_system())
985        .map(|cluster| (cluster.name.to_string(), cluster))
986        .collect();
987
988    // Add new clusters.
989    for builtin_cluster in BUILTIN_CLUSTERS {
990        if durable_clusters.remove(builtin_cluster.name).is_none() {
991            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
992
993            txn.insert_system_cluster(
994                builtin_cluster.name,
995                vec![],
996                builtin_cluster.privileges.to_vec(),
997                builtin_cluster.owner_id.to_owned(),
998                mz_catalog::durable::ClusterConfig {
999                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
1000                        size: cluster_config.size,
1001                        availability_zones: vec![],
1002                        replication_factor: cluster_config.replication_factor,
1003                        logging: default_logging_config(),
1004                        optimizer_feature_overrides: Default::default(),
1005                        schedule: Default::default(),
1006                    }),
1007                    workload_class: None,
1008                },
1009                &HashSet::new(),
1010            )?;
1011        }
1012    }
1013
1014    // Remove old clusters.
1015    let old_clusters = durable_clusters
1016        .values()
1017        .map(|cluster| cluster.id)
1018        .collect();
1019    txn.remove_clusters(&old_clusters)?;
1020
1021    Ok(())
1022}
1023
1024fn add_new_remove_old_builtin_introspection_source_migration(
1025    txn: &mut mz_catalog::durable::Transaction<'_>,
1026) -> Result<(), AdapterError> {
1027    let mut new_indexes = Vec::new();
1028    let mut removed_indexes = BTreeSet::new();
1029    for cluster in txn.get_clusters() {
1030        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1031
1032        let mut new_logs = Vec::new();
1033
1034        for log in BUILTINS::logs() {
1035            if introspection_source_index_ids.remove(log.name).is_none() {
1036                new_logs.push(log);
1037            }
1038        }
1039
1040        for log in new_logs {
1041            let (item_id, gid) =
1042                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1043            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1044        }
1045
1046        // Anything left in `introspection_source_index_ids` must have been deleted and should be
1047        // removed from the catalog.
1048        removed_indexes.extend(
1049            introspection_source_index_ids
1050                .into_keys()
1051                .map(|name| (cluster.id, name.to_string())),
1052        );
1053    }
1054    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1055    txn.remove_introspection_source_indexes(removed_indexes)?;
1056    Ok(())
1057}
1058
1059fn add_new_remove_old_builtin_roles_migration(
1060    txn: &mut mz_catalog::durable::Transaction<'_>,
1061) -> Result<(), mz_catalog::durable::CatalogError> {
1062    let mut durable_roles: BTreeMap<_, _> = txn
1063        .get_roles()
1064        .filter(|role| role.id.is_system() || role.id.is_predefined())
1065        .map(|role| (role.name.to_string(), role))
1066        .collect();
1067
1068    // Add new roles.
1069    for builtin_role in BUILTIN_ROLES {
1070        if durable_roles.remove(builtin_role.name).is_none() {
1071            txn.insert_builtin_role(
1072                builtin_role.id,
1073                builtin_role.name.to_string(),
1074                builtin_role.attributes.clone(),
1075                RoleMembership::new(),
1076                RoleVars::default(),
1077                builtin_role.oid,
1078            )?;
1079        }
1080    }
1081
1082    // Remove old roles.
1083    let old_roles = durable_roles.values().map(|role| role.id).collect();
1084    txn.remove_roles(&old_roles)?;
1085
1086    Ok(())
1087}
1088
1089fn add_new_remove_old_builtin_cluster_replicas_migration(
1090    txn: &mut Transaction<'_>,
1091    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1092) -> Result<(), AdapterError> {
1093    let cluster_lookup: BTreeMap<_, _> = txn
1094        .get_clusters()
1095        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1096        .collect();
1097
1098    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1099        .get_cluster_replicas()
1100        .filter(|replica| replica.replica_id.is_system())
1101        .fold(BTreeMap::new(), |mut acc, replica| {
1102            acc.entry(replica.cluster_id)
1103                .or_insert_with(BTreeMap::new)
1104                .insert(replica.name.to_string(), replica);
1105            acc
1106        });
1107
1108    // Add new replicas.
1109    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1110        let cluster = cluster_lookup
1111            .get(builtin_replica.cluster_name)
1112            .expect("builtin cluster replica references non-existent cluster");
1113        // `empty_map` is a hack to simplify the if statement below.
1114        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1115        let replica_names = durable_replicas
1116            .get_mut(&cluster.id)
1117            .unwrap_or(&mut empty_map);
1118
1119        let builtin_cluster_boostrap_config =
1120            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1121        if replica_names.remove(builtin_replica.name).is_none()
1122            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1123            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1124            // factor is configurable on bootstrap.
1125            && builtin_cluster_boostrap_config.replication_factor > 0
1126        {
1127            let replica_size = match cluster.config.variant {
1128                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1129                ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
1130            };
1131
1132            let config = builtin_cluster_replica_config(replica_size);
1133            txn.insert_cluster_replica(
1134                cluster.id,
1135                builtin_replica.name,
1136                config,
1137                MZ_SYSTEM_ROLE_ID,
1138            )?;
1139        }
1140    }
1141
1142    // Remove old replicas.
1143    let old_replicas = durable_replicas
1144        .values()
1145        .flat_map(|replicas| replicas.values().map(|replica| replica.replica_id))
1146        .collect();
1147    txn.remove_cluster_replicas(&old_replicas)?;
1148
1149    Ok(())
1150}
1151
1152/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1153/// the 'cluster' parameter.
1154///
1155/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1156/// input is no longer valid. For example if we remove a configuration parameter or change the
1157/// accepted set of values.
1158fn remove_invalid_config_param_role_defaults_migration(
1159    txn: &mut Transaction<'_>,
1160) -> Result<(), AdapterError> {
1161    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1162
1163    let roles_to_migrate: BTreeMap<_, _> = txn
1164        .get_roles()
1165        .filter_map(|mut role| {
1166            // Create an empty SessionVars just so we can check if a var is valid.
1167            //
1168            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1169            // session variables.
1170            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1171
1172            // Iterate over all of the variable defaults for this role.
1173            let mut invalid_roles_vars = BTreeMap::new();
1174            for (name, value) in &role.vars.map {
1175                // If one does not exist or its value is invalid, then mark it for removal.
1176                let Ok(session_var) = session_vars.inspect(name) else {
1177                    invalid_roles_vars.insert(name.clone(), value.clone());
1178                    continue;
1179                };
1180                if session_var.check(value.borrow()).is_err() {
1181                    invalid_roles_vars.insert(name.clone(), value.clone());
1182                }
1183            }
1184
1185            // If the role has no invalid values, nothing to do!
1186            if invalid_roles_vars.is_empty() {
1187                return None;
1188            }
1189
1190            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1191
1192            // Otherwise, remove the variables from the role and return it to be updated.
1193            for (name, _value) in invalid_roles_vars {
1194                role.vars.map.remove(&name);
1195            }
1196            Some(role)
1197        })
1198        .map(|role| (role.id, role))
1199        .collect();
1200
1201    txn.update_roles_without_auth(roles_to_migrate)?;
1202
1203    Ok(())
1204}
1205
1206/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1207/// are marked as pending and should be cleaned up on catalog opsn.
1208fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1209    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1210        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1211            replica.config.location
1212        {
1213            tx.remove_cluster_replica(replica.replica_id)?;
1214        }
1215    }
1216    Ok(())
1217}
1218
1219pub(crate) fn builtin_cluster_replica_config(
1220    replica_size: String,
1221) -> mz_catalog::durable::ReplicaConfig {
1222    mz_catalog::durable::ReplicaConfig {
1223        location: mz_catalog::durable::ReplicaLocation::Managed {
1224            availability_zone: None,
1225            billed_as: None,
1226            pending: false,
1227            internal: false,
1228            size: replica_size,
1229        },
1230        logging: default_logging_config(),
1231    }
1232}
1233
1234fn default_logging_config() -> ReplicaLogging {
1235    ReplicaLogging {
1236        log_logging: false,
1237        interval: Some(Duration::from_secs(1)),
1238    }
1239}
1240
1241#[derive(Debug)]
1242pub struct BuiltinBootstrapClusterConfigMap {
1243    /// Size and replication factor to default system_cluster on bootstrap
1244    pub system_cluster: BootstrapBuiltinClusterConfig,
1245    /// Size and replication factor to default catalog_server_cluster on bootstrap
1246    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1247    /// Size and replication factor to default probe_cluster on bootstrap
1248    pub probe_cluster: BootstrapBuiltinClusterConfig,
1249    /// Size and replication factor to default support_cluster on bootstrap
1250    pub support_cluster: BootstrapBuiltinClusterConfig,
1251    /// Size to default analytics_cluster on bootstrap
1252    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1253}
1254
1255impl BuiltinBootstrapClusterConfigMap {
1256    /// Gets the size of the builtin cluster based on the provided name
1257    fn get_config(
1258        &self,
1259        cluster_name: &str,
1260    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1261        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1262            &self.system_cluster
1263        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1264            &self.catalog_server_cluster
1265        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1266            &self.probe_cluster
1267        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1268            &self.support_cluster
1269        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1270            &self.analytics_cluster
1271        } else {
1272            return Err(mz_catalog::durable::CatalogError::Catalog(
1273                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1274            ));
1275        };
1276        Ok(cluster_config.clone())
1277    }
1278}
1279
1280/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1281///
1282///   - Convert each update into a type that implements [`std::cmp::Ord`].
1283///   - Update the timestamp of each update to the same value.
1284///   - Convert the diff of each update to a type that implements
1285///     [`differential_dataflow::difference::Semigroup`].
1286///
1287/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1288/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1289/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1290/// and are only created after bootstrap is complete. So we forcibly convert each
1291/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1292/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1293/// temporary item variant and does implement [`std::cmp::Ord`].
1294///
1295/// WARNING: Do not call outside of startup.
1296pub(crate) fn into_consolidatable_updates_startup(
1297    updates: Vec<StateUpdate>,
1298    ts: Timestamp,
1299) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1300    updates
1301        .into_iter()
1302        .map(|StateUpdate { kind, ts: _, diff }| {
1303            let kind: BootstrapStateUpdateKind = kind
1304                .try_into()
1305                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1306            (kind, ts, Diff::from(diff))
1307        })
1308        .collect()
1309}
1310
1311fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1312    defaults: &BTreeMap<String, String>,
1313    remote: Option<&BTreeMap<String, String>>,
1314    cfg: &mz_dyncfg::Config<T>,
1315) -> T::ConfigType {
1316    let mut val = T::into_config_type(cfg.default().clone());
1317    let get_fn = |map: &BTreeMap<String, String>| {
1318        let val = map.get(cfg.name())?;
1319        match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1320            Ok(x) => Some(x),
1321            Err(err) => {
1322                tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1323                None
1324            }
1325        }
1326    };
1327    if let Some(x) = get_fn(defaults) {
1328        val = x;
1329    }
1330    if let Some(x) = remote.and_then(get_fn) {
1331        val = x;
1332    }
1333    val
1334}