Skip to main content

mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
23use mz_audit_log::{
24    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, ObjectType, VersionedEvent,
25};
26use mz_auth::hash::scram256_hash;
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::{
29    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
30    Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
31};
32use mz_catalog::config::StateConfig;
33use mz_catalog::durable::objects::{
34    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
35};
36use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
37use mz_catalog::expr_cache::{
38    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
39};
40use mz_catalog::memory::error::{Error, ErrorKind};
41use mz_catalog::memory::objects::{
42    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
43};
44use mz_controller::clusters::ReplicaLogging;
45use mz_controller_types::ClusterId;
46use mz_ore::cast::usize_to_u64;
47use mz_ore::collections::HashSet;
48use mz_ore::now::{SYSTEM_TIME, to_datetime};
49use mz_ore::{instrument, soft_assert_no_log};
50use mz_repr::adt::mz_acl_item::PrivilegeMap;
51use mz_repr::namespaces::is_unstable_schema;
52use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
53use mz_sql::catalog::{
54    BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
55};
56use mz_sql::func::OP_IMPLS;
57use mz_sql::names::CommentObjectId;
58use mz_sql::rbac;
59use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
60use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
61use mz_storage_client::controller::{StorageMetadata, StorageTxn};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{Instrument, info, warn};
64use uuid::Uuid;
65
66// DO NOT add any more imports from `crate` outside of `crate::catalog`.
67use crate::AdapterError;
68use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState, Config, is_reserved_name};
71
72pub struct InitializeStateResult {
73    /// An initialized [`CatalogState`].
74    pub state: CatalogState,
75    /// A set of new shards that may need to be initialized (only used by 0dt migration).
76    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
77    /// A set of new builtin items.
78    pub new_builtin_collections: BTreeSet<GlobalId>,
79    /// A list of builtin table updates corresponding to the initialized state.
80    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
81    /// The version of the catalog that existed before initializing the catalog.
82    pub last_seen_version: String,
83    /// A handle to the expression cache if it's enabled.
84    pub expr_cache_handle: Option<ExpressionCacheHandle>,
85    /// The global expressions that were cached in `expr_cache_handle`.
86    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
87    /// The local expressions that were NOT cached in `expr_cache_handle`.
88    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
89}
90
91pub struct OpenCatalogResult {
92    /// An opened [`Catalog`].
93    pub catalog: Catalog,
94    /// A set of new shards that may need to be initialized.
95    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
96    /// A set of new builtin items.
97    pub new_builtin_collections: BTreeSet<GlobalId>,
98    /// A list of builtin table updates corresponding to the initialized state.
99    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
100    /// The global expressions that were cached in the expression cache.
101    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
102    /// The local expressions that were NOT cached in the expression cache.
103    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
104}
105
106impl Catalog {
107    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
108    /// external to a [mz_catalog::durable::DurableCatalogState]
109    /// (for example: no [mz_secrets::SecretsReader]).
110    pub async fn initialize_state<'a>(
111        config: StateConfig,
112        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
113    ) -> Result<InitializeStateResult, AdapterError> {
114        for builtin_role in BUILTIN_ROLES {
115            assert!(
116                is_reserved_name(builtin_role.name),
117                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
118                BUILTIN_PREFIXES.join(", ")
119            );
120        }
121        for builtin_cluster in BUILTIN_CLUSTERS {
122            assert!(
123                is_reserved_name(builtin_cluster.name),
124                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
125                BUILTIN_PREFIXES.join(", ")
126            );
127        }
128
129        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
130        if config.all_features {
131            system_configuration.enable_all_feature_flags_by_default();
132        }
133
134        let mut state = CatalogState {
135            database_by_name: imbl::OrdMap::new(),
136            database_by_id: imbl::OrdMap::new(),
137            entry_by_id: imbl::OrdMap::new(),
138            entry_by_global_id: imbl::OrdMap::new(),
139            notices_by_dep_id: imbl::OrdMap::new(),
140            ambient_schemas_by_name: imbl::OrdMap::new(),
141            ambient_schemas_by_id: imbl::OrdMap::new(),
142            clusters_by_name: imbl::OrdMap::new(),
143            clusters_by_id: imbl::OrdMap::new(),
144            roles_by_name: imbl::OrdMap::new(),
145            roles_by_id: imbl::OrdMap::new(),
146            network_policies_by_id: imbl::OrdMap::new(),
147            role_auth_by_id: imbl::OrdMap::new(),
148            network_policies_by_name: imbl::OrdMap::new(),
149            system_configuration: Arc::new(system_configuration),
150            default_privileges: Arc::new(DefaultPrivileges::default()),
151            system_privileges: Arc::new(PrivilegeMap::default()),
152            comments: Arc::new(CommentsMap::default()),
153            source_references: imbl::OrdMap::new(),
154            storage_metadata: Arc::new(StorageMetadata::default()),
155            temporary_schemas: imbl::OrdMap::new(),
156            mock_authentication_nonce: Default::default(),
157            config: mz_sql::catalog::CatalogConfig {
158                start_time: to_datetime((config.now)()),
159                start_instant: Instant::now(),
160                nonce: rand::random(),
161                environment_id: config.environment_id,
162                session_id: Uuid::new_v4(),
163                build_info: config.build_info,
164                now: config.now.clone(),
165                connection_context: config.connection_context,
166                builtins_cfg: BuiltinsConfig {
167                    // This will fall back to the default in code (false) if we timeout on the
168                    // initial LD sync. We're just using this to get some additional testing in on
169                    // CTs so a false negative is fine, we're only worried about false positives.
170                    include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
171                        &config.system_parameter_defaults,
172                        config.remote_system_parameters.as_ref(),
173                        &ENABLE_CONTINUAL_TASK_BUILTINS,
174                    ),
175                },
176                helm_chart_version: config.helm_chart_version,
177            },
178            cluster_replica_sizes: config.cluster_replica_sizes,
179            availability_zones: config.availability_zones,
180            egress_addresses: config.egress_addresses,
181            aws_principal_context: config.aws_principal_context,
182            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
183            http_host_name: config.http_host_name,
184            license_key: config.license_key,
185        };
186
187        let deploy_generation = storage.get_deployment_generation().await?;
188
189        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
190        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
191        let mut txn = storage.transaction().await?;
192
193        // Migrate/update durable data before we start loading the in-memory catalog.
194        let new_builtin_collections = {
195            migrate::durable_migrate(
196                &mut txn,
197                state.config.environment_id.organization_id(),
198                config.boot_ts,
199            )?;
200            // Overwrite and persist selected parameter values in `remote_system_parameters` that
201            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
202            if let Some(remote_system_parameters) = config.remote_system_parameters {
203                for (name, value) in remote_system_parameters {
204                    txn.upsert_system_config(&name, value)?;
205                }
206                txn.set_system_config_synced_once()?;
207            }
208            // Add any new builtin objects and remove old ones.
209            let new_builtin_collections =
210                add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
211            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
212                system_cluster: config.builtin_system_cluster_config,
213                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
214                probe_cluster: config.builtin_probe_cluster_config,
215                support_cluster: config.builtin_support_cluster_config,
216                analytics_cluster: config.builtin_analytics_cluster_config,
217            };
218            add_new_remove_old_builtin_clusters_migration(
219                &mut txn,
220                &builtin_bootstrap_cluster_config_map,
221                config.boot_ts,
222            )?;
223            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
224            add_new_remove_old_builtin_cluster_replicas_migration(
225                &mut txn,
226                &builtin_bootstrap_cluster_config_map,
227                config.boot_ts,
228            )?;
229            add_new_remove_old_builtin_roles_migration(&mut txn)?;
230            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
231            remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
232
233            new_builtin_collections
234        };
235
236        let op_updates = txn.get_and_commit_op_updates();
237        updates.extend(op_updates);
238
239        let mut builtin_table_updates = Vec::new();
240
241        // Seed the in-memory catalog with values that don't come from the durable catalog.
242        {
243            // Set defaults from configuration passed in the provided `system_parameter_defaults`
244            // map.
245            for (name, value) in config.system_parameter_defaults {
246                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
247                    Ok(_) => (),
248                    Err(Error {
249                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
250                    }) => {
251                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
252                    }
253                    Err(e) => return Err(e.into()),
254                };
255            }
256            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
257        }
258
259        // Make life easier by consolidating all updates, so that we end up with only positive
260        // diffs.
261        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
262        differential_dataflow::consolidation::consolidate_updates(&mut updates);
263        soft_assert_no_log!(
264            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
265            "consolidated updates should be positive during startup: {updates:?}"
266        );
267
268        let mut pre_item_updates = Vec::new();
269        let mut system_item_updates = Vec::new();
270        let mut item_updates = Vec::new();
271        let mut post_item_updates = Vec::new();
272        let mut audit_log_updates = Vec::new();
273        for (kind, ts, diff) in updates {
274            match kind {
275                BootstrapStateUpdateKind::Role(_)
276                | BootstrapStateUpdateKind::RoleAuth(_)
277                | BootstrapStateUpdateKind::Database(_)
278                | BootstrapStateUpdateKind::Schema(_)
279                | BootstrapStateUpdateKind::DefaultPrivilege(_)
280                | BootstrapStateUpdateKind::SystemPrivilege(_)
281                | BootstrapStateUpdateKind::SystemConfiguration(_)
282                | BootstrapStateUpdateKind::Cluster(_)
283                | BootstrapStateUpdateKind::NetworkPolicy(_)
284                | BootstrapStateUpdateKind::ClusterReplica(_) => {
285                    pre_item_updates.push(StateUpdate {
286                        kind: kind.into(),
287                        ts,
288                        diff: diff.try_into().expect("valid diff"),
289                    })
290                }
291                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
292                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
293                    system_item_updates.push(StateUpdate {
294                        kind: kind.into(),
295                        ts,
296                        diff: diff.try_into().expect("valid diff"),
297                    })
298                }
299                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
300                    kind: kind.into(),
301                    ts,
302                    diff: diff.try_into().expect("valid diff"),
303                }),
304                BootstrapStateUpdateKind::Comment(_)
305                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
306                | BootstrapStateUpdateKind::SourceReferences(_)
307                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
308                    post_item_updates.push((kind, ts, diff));
309                }
310                BootstrapStateUpdateKind::AuditLog(_) => {
311                    audit_log_updates.push(StateUpdate {
312                        kind: kind.into(),
313                        ts,
314                        diff: diff.try_into().expect("valid diff"),
315                    });
316                }
317            }
318        }
319
320        let (builtin_table_update, _catalog_updates) = state
321            .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
322            .await;
323        builtin_table_updates.extend(builtin_table_update);
324
325        // Ensure mz_system has a password if configured to have one.
326        // It's important we do this after the `pre_item_updates` so that
327        // the mz_system role exists in the catalog.
328        {
329            if let Some(password) = config.external_login_password_mz_system {
330                let role_auth = RoleAuth {
331                    role_id: MZ_SYSTEM_ROLE_ID,
332                    // builtin roles should always use a secure scram iteration
333                    // <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
334                    password_hash: Some(
335                        scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
336                            .map_err(|_| {
337                            AdapterError::Internal("Failed to hash mz_system password.".to_owned())
338                        })?,
339                    ),
340                    updated_at: SYSTEM_TIME(),
341                };
342                state
343                    .role_auth_by_id
344                    .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
345                let builtin_table_update = state.generate_builtin_table_update(
346                    mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
347                    mz_catalog::memory::objects::StateDiff::Addition,
348                );
349                builtin_table_updates.extend(builtin_table_update);
350            }
351        }
352
353        let expr_cache_start = Instant::now();
354        info!("startup: coordinator init: catalog open: expr cache open beginning");
355        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
356        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
357        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
358        let expr_cache_enabled = config
359            .enable_expression_cache_override
360            .unwrap_or(enable_expr_cache_dyncfg);
361        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
362            info!(
363                ?config.enable_expression_cache_override,
364                ?enable_expr_cache_dyncfg,
365                "using expression cache for startup"
366            );
367            let current_ids = txn
368                .get_items()
369                .flat_map(|item| {
370                    let gid = item.global_id.clone();
371                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
372                    std::iter::once(gid).chain(gids.into_iter())
373                })
374                .chain(
375                    txn.get_system_object_mappings()
376                        .map(|som| som.unique_identifier.global_id),
377                )
378                .collect();
379            let dyncfgs = config.persist_client.dyncfgs().clone();
380            let build_version = if config.build_info.is_dev() {
381                // A single dev version can be used for many different builds, so we need to use
382                // the build version that is also enriched with build metadata.
383                config
384                    .build_info
385                    .semver_version_build()
386                    .expect("build ID is not available on your platform!")
387            } else {
388                config.build_info.semver_version()
389            };
390            let expr_cache_config = ExpressionCacheConfig {
391                build_version,
392                shard_id: txn
393                    .get_expression_cache_shard()
394                    .expect("expression cache shard should exist for opened catalogs"),
395                persist: config.persist_client,
396                current_ids,
397                remove_prior_versions: !config.read_only,
398                compact_shard: config.read_only,
399                dyncfgs,
400            };
401            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
402                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
403            (
404                Some(expr_cache_handle),
405                cached_local_exprs,
406                cached_global_exprs,
407            )
408        } else {
409            (None, BTreeMap::new(), BTreeMap::new())
410        };
411        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
412        info!(
413            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
414            expr_cache_start.elapsed()
415        );
416
417        // When initializing/bootstrapping, we don't use the catalog updates but
418        // instead load the catalog fully and then go ahead and apply commands
419        // to the controller(s). Maybe we _should_ instead use the same logic
420        // and return and use the updates from here. But that's at the very
421        // least future work.
422        let (builtin_table_update, _catalog_updates) = state
423            .apply_updates(system_item_updates, &mut local_expr_cache)
424            .await;
425        builtin_table_updates.extend(builtin_table_update);
426
427        let last_seen_version =
428            get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
429
430        let mz_authentication_mock_nonce =
431            txn.get_authentication_mock_nonce().ok_or_else(|| {
432                Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
433            })?;
434
435        state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
436
437        // Migrate item ASTs.
438        let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
439            let migrate_result = migrate::migrate(
440                &mut state,
441                &mut txn,
442                &mut local_expr_cache,
443                item_updates,
444                config.now,
445                config.boot_ts,
446            )
447            .await
448            .map_err(|e| {
449                Error::new(ErrorKind::FailedCatalogMigration {
450                    last_seen_version: last_seen_version.clone(),
451                    this_version: config.build_info.version,
452                    cause: e.to_string(),
453                })
454            })?;
455            if !migrate_result.post_item_updates.is_empty() {
456                // Include any post-item-updates generated by migrations, and then consolidate
457                // them to ensure diffs are all positive.
458                post_item_updates.extend(migrate_result.post_item_updates);
459                // Push everything to the same timestamp so it consolidates cleanly.
460                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
461                    for (_, ts, _) in &mut post_item_updates {
462                        *ts = max_ts;
463                    }
464                }
465                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
466            }
467
468            (
469                migrate_result.builtin_table_updates,
470                migrate_result.catalog_updates,
471            )
472        } else {
473            state
474                .apply_updates(item_updates, &mut local_expr_cache)
475                .await
476        };
477        builtin_table_updates.extend(builtin_table_update);
478
479        let post_item_updates = post_item_updates
480            .into_iter()
481            .map(|(kind, ts, diff)| StateUpdate {
482                kind: kind.into(),
483                ts,
484                diff: diff.try_into().expect("valid diff"),
485            })
486            .collect();
487        let (builtin_table_update, _catalog_updates) = state
488            .apply_updates(post_item_updates, &mut local_expr_cache)
489            .await;
490        builtin_table_updates.extend(builtin_table_update);
491
492        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
493        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
494        // updates.
495        for audit_log_update in audit_log_updates {
496            builtin_table_updates.extend(
497                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
498            );
499        }
500
501        // Migrate builtin items.
502        let schema_migration_result = builtin_schema_migration::run(
503            config.build_info,
504            deploy_generation,
505            &mut txn,
506            config.builtin_item_migration_config,
507        )
508        .await?;
509
510        let state_updates = txn.get_and_commit_op_updates();
511
512        // When initializing/bootstrapping, we don't use the catalog updates but
513        // instead load the catalog fully and then go ahead and apply commands
514        // to the controller(s). Maybe we _should_ instead use the same logic
515        // and return and use the updates from here. But that's at the very
516        // least future work.
517        let (table_updates, _catalog_updates) = state
518            .apply_updates(state_updates, &mut local_expr_cache)
519            .await;
520        builtin_table_updates.extend(table_updates);
521        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
522
523        // Bump the migration version immediately before committing.
524        set_migration_version(&mut txn, config.build_info.semver_version())?;
525
526        txn.commit(config.boot_ts).await?;
527
528        // Now that the migration is durable, run any requested deferred cleanup.
529        schema_migration_result.cleanup_action.await;
530
531        Ok(InitializeStateResult {
532            state,
533            migrated_storage_collections_0dt: schema_migration_result.replaced_items,
534            new_builtin_collections: new_builtin_collections.into_iter().collect(),
535            builtin_table_updates,
536            last_seen_version,
537            expr_cache_handle,
538            cached_global_exprs,
539            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
540        })
541    }
542
543    /// Opens or creates a catalog that stores data at `path`.
544    ///
545    /// Returns the catalog, metadata about builtin objects that have changed
546    /// schemas since last restart, a list of updates to builtin tables that
547    /// describe the initial state of the catalog, and the version of the
548    /// catalog before any migrations were performed.
549    ///
550    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
551    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
552    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
553    #[instrument(name = "catalog::open")]
554    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
555        async move {
556            let mut storage = config.storage;
557
558            let InitializeStateResult {
559                state,
560                migrated_storage_collections_0dt,
561                new_builtin_collections,
562                mut builtin_table_updates,
563                last_seen_version: _,
564                expr_cache_handle,
565                cached_global_exprs,
566                uncached_local_exprs,
567            } =
568                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
569                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
570                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
571                Self::initialize_state(config.state, &mut storage)
572                    .instrument(tracing::info_span!("catalog::initialize_state"))
573                    .boxed()
574                    .await?;
575
576            let catalog = Catalog {
577                state,
578                expr_cache_handle,
579                transient_revision: 1,
580                storage: Arc::new(tokio::sync::Mutex::new(storage)),
581            };
582
583            // Operators aren't stored in the catalog, but we would like them in
584            // introspection views.
585            for (op, func) in OP_IMPLS.iter() {
586                match func {
587                    mz_sql::func::Func::Scalar(impls) => {
588                        for imp in impls {
589                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
590                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
591                            ));
592                        }
593                    }
594                    _ => unreachable!("all operators must be scalar functions"),
595                }
596            }
597
598            for ip in &catalog.state.egress_addresses {
599                builtin_table_updates.push(
600                    catalog
601                        .state
602                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
603                );
604            }
605
606            if !catalog.state.license_key.id.is_empty() {
607                builtin_table_updates.push(
608                    catalog.state.resolve_builtin_table_update(
609                        catalog
610                            .state
611                            .pack_license_key_update(&catalog.state.license_key)?,
612                    ),
613                );
614            }
615
616            catalog.storage().await.mark_bootstrap_complete().await;
617
618            Ok(OpenCatalogResult {
619                catalog,
620                migrated_storage_collections_0dt,
621                new_builtin_collections,
622                builtin_table_updates,
623                cached_global_exprs,
624                uncached_local_exprs,
625            })
626        }
627        .instrument(tracing::info_span!("catalog::open"))
628        .boxed()
629    }
630
631    /// Initializes STORAGE to understand all shards that `self` expects to
632    /// exist.
633    ///
634    /// Note that this must be done before creating/rendering collections
635    /// because the storage controller might not be aware of new system
636    /// collections created between versions.
637    async fn initialize_storage_state(
638        &mut self,
639        storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
640    ) -> Result<(), mz_catalog::durable::CatalogError> {
641        let collections = self
642            .entries()
643            .filter(|entry| entry.item().is_storage_collection())
644            .flat_map(|entry| entry.global_ids())
645            .collect();
646
647        // Clone the state so that any errors that occur do not leak any
648        // transformations on error.
649        let mut state = self.state.clone();
650
651        let mut storage = self.storage().await;
652        let shard_id = storage.shard_id();
653        let mut txn = storage.transaction().await?;
654
655        // Ensure the storage controller knows about the catalog shard and associates it with the
656        // `MZ_CATALOG_RAW` builtin source.
657        let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
658        let global_id = self.get_entry(&item_id).latest_global_id();
659        match txn.get_collection_metadata().get(&global_id) {
660            None => {
661                txn.insert_collection_metadata([(global_id, shard_id)].into())
662                    .map_err(mz_catalog::durable::DurableCatalogError::from)?;
663            }
664            Some(id) => assert_eq!(*id, shard_id),
665        }
666
667        storage_collections
668            .initialize_state(&mut txn, collections)
669            .await
670            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
671
672        let updates = txn.get_and_commit_op_updates();
673        let (builtin_updates, catalog_updates) = state
674            .apply_updates(updates, &mut LocalExpressionCache::Closed)
675            .await;
676        assert!(
677            builtin_updates.is_empty(),
678            "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
679        );
680        assert!(
681            catalog_updates.is_empty(),
682            "storage is not allowed to generate catalog changes that would change the catalog or controller state"
683        );
684        let commit_ts = txn.upper();
685        txn.commit(commit_ts).await?;
686        drop(storage);
687
688        // Save updated state.
689        self.state = state;
690        Ok(())
691    }
692
693    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
694    /// so make it available and initialize the controller.
695    pub async fn initialize_controller(
696        &mut self,
697        config: mz_controller::ControllerConfig,
698        envd_epoch: core::num::NonZeroI64,
699        read_only: bool,
700    ) -> Result<mz_controller::Controller, mz_catalog::durable::CatalogError> {
701        let controller_start = Instant::now();
702        info!("startup: controller init: beginning");
703
704        let controller = {
705            let mut storage = self.storage().await;
706            let mut tx = storage.transaction().await?;
707            mz_controller::prepare_initialization(&mut tx)
708                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
709            let updates = tx.get_and_commit_op_updates();
710            assert!(
711                updates.is_empty(),
712                "initializing controller should not produce updates: {updates:?}"
713            );
714            let commit_ts = tx.upper();
715            tx.commit(commit_ts).await?;
716
717            let read_only_tx = storage.transaction().await?;
718
719            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
720        };
721
722        self.initialize_storage_state(&controller.storage_collections)
723            .await?;
724
725        info!(
726            "startup: controller init: complete in {:?}",
727            controller_start.elapsed()
728        );
729
730        Ok(controller)
731    }
732
733    /// Politely releases all external resources that can only be released in an async context.
734    pub async fn expire(self) {
735        // If no one else holds a reference to storage, then clean up the storage resources.
736        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
737        if let Some(storage) = Arc::into_inner(self.storage) {
738            let storage = storage.into_inner();
739            storage.expire().await;
740        }
741    }
742}
743
744impl CatalogState {
745    /// Set the default value for `name`, which is the value it will be reset to.
746    fn set_system_configuration_default(
747        &mut self,
748        name: &str,
749        value: VarInput,
750    ) -> Result<(), Error> {
751        Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
752    }
753}
754
755/// Updates the catalog with new and removed builtin items.
756///
757/// Returns the list of new builtin [`GlobalId`]s.
758fn add_new_remove_old_builtin_items_migration(
759    builtins_cfg: &BuiltinsConfig,
760    txn: &mut mz_catalog::durable::Transaction<'_>,
761) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
762    let mut new_builtin_mappings = Vec::new();
763    // Used to validate unique descriptions.
764    let mut builtin_descs = HashSet::new();
765
766    // We compare the builtin items that are compiled into the binary with the builtin items that
767    // are persisted in the catalog to discover new and deleted builtin items.
768    let mut builtins = Vec::new();
769    for builtin in BUILTINS::iter(builtins_cfg) {
770        let desc = SystemObjectDescription {
771            schema_name: builtin.schema().to_string(),
772            object_type: builtin.catalog_item_type(),
773            object_name: builtin.name().to_string(),
774        };
775        // Validate that the description is unique.
776        if !builtin_descs.insert(desc.clone()) {
777            panic!(
778                "duplicate builtin description: {:?}, {:?}",
779                SystemObjectDescription {
780                    schema_name: builtin.schema().to_string(),
781                    object_type: builtin.catalog_item_type(),
782                    object_name: builtin.name().to_string(),
783                },
784                builtin
785            );
786        }
787        builtins.push((desc, builtin));
788    }
789
790    let mut system_object_mappings: BTreeMap<_, _> = txn
791        .get_system_object_mappings()
792        .map(|system_object_mapping| {
793            (
794                system_object_mapping.description.clone(),
795                system_object_mapping,
796            )
797        })
798        .collect();
799
800    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
801        builtins.into_iter().partition_map(|(desc, builtin)| {
802            let fingerprint = match builtin.runtime_alterable() {
803                false => builtin.fingerprint(),
804                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
805            };
806            match system_object_mappings.remove(&desc) {
807                Some(system_object_mapping) => {
808                    Either::Left((builtin, system_object_mapping, fingerprint))
809                }
810                None => Either::Right((builtin, fingerprint)),
811            }
812        });
813    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
814    let new_builtins: Vec<_> = new_builtins
815        .into_iter()
816        .zip_eq(new_builtin_ids.clone())
817        .collect();
818
819    // Add new builtin items to catalog.
820    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
821        new_builtin_mappings.push(SystemObjectMapping {
822            description: SystemObjectDescription {
823                schema_name: builtin.schema().to_string(),
824                object_type: builtin.catalog_item_type(),
825                object_name: builtin.name().to_string(),
826            },
827            unique_identifier: SystemObjectUniqueIdentifier {
828                catalog_id,
829                global_id,
830                fingerprint,
831            },
832        });
833
834        // Runtime-alterable system objects are durably recorded to the
835        // usual items collection, so that they can be later altered at
836        // runtime by their owner (i.e., outside of the usual builtin
837        // migration framework that requires changes to the binary
838        // itself).
839        let handled_runtime_alterable = match builtin {
840            Builtin::Connection(c) if c.runtime_alterable => {
841                let mut acl_items = vec![rbac::owner_privilege(
842                    mz_sql::catalog::ObjectType::Connection,
843                    c.owner_id.clone(),
844                )];
845                acl_items.extend_from_slice(c.access);
846                // Builtin Connections cannot be versioned.
847                let versions = BTreeMap::new();
848
849                txn.insert_item(
850                    catalog_id,
851                    c.oid,
852                    global_id,
853                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
854                    c.name,
855                    c.sql.into(),
856                    *c.owner_id,
857                    acl_items,
858                    versions,
859                )?;
860                true
861            }
862            _ => false,
863        };
864        assert_eq!(
865            builtin.runtime_alterable(),
866            handled_runtime_alterable,
867            "runtime alterable object was not handled by migration",
868        );
869    }
870    txn.set_system_object_mappings(new_builtin_mappings)?;
871
872    // Update comments of all builtin objects
873    let builtins_with_catalog_ids = existing_builtins
874        .iter()
875        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
876        .chain(
877            new_builtins
878                .into_iter()
879                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
880        );
881
882    for (builtin, id) in builtins_with_catalog_ids {
883        let (comment_id, desc, comments) = match builtin {
884            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
885            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
886            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
887            Builtin::MaterializedView(mv) => (
888                CommentObjectId::MaterializedView(id),
889                &mv.desc,
890                &mv.column_comments,
891            ),
892            Builtin::Log(_)
893            | Builtin::Type(_)
894            | Builtin::Func(_)
895            | Builtin::ContinualTask(_)
896            | Builtin::Index(_)
897            | Builtin::Connection(_) => continue,
898        };
899        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
900
901        let mut comments = comments.clone();
902        for (col_idx, name) in desc.iter_names().enumerate() {
903            if let Some(comment) = comments.remove(name.as_str()) {
904                // Comment column indices are 1 based
905                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
906            }
907        }
908        assert!(
909            comments.is_empty(),
910            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
911        );
912    }
913
914    // Anything left in `system_object_mappings` must have been deleted and should be removed from
915    // the catalog.
916    let mut deleted_system_objects = BTreeSet::new();
917    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
918    let mut deleted_comments = BTreeSet::new();
919    for (desc, mapping) in system_object_mappings {
920        deleted_system_objects.insert(mapping.description);
921        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
922            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
923        }
924
925        let id = mapping.unique_identifier.catalog_id;
926        let comment_id = match desc.object_type {
927            CatalogItemType::Table => CommentObjectId::Table(id),
928            CatalogItemType::Source => CommentObjectId::Source(id),
929            CatalogItemType::View => CommentObjectId::View(id),
930            CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(id),
931            CatalogItemType::Sink
932            | CatalogItemType::Index
933            | CatalogItemType::Type
934            | CatalogItemType::Func
935            | CatalogItemType::Secret
936            | CatalogItemType::Connection
937            | CatalogItemType::ContinualTask => continue,
938        };
939        deleted_comments.insert(comment_id);
940    }
941    // If you are 100% positive that it is safe to delete a system object outside any of the
942    // unstable schemas, then add it to this set. Make sure that no prod environments are
943    // using this object and that the upgrade checker does not show any issues.
944    //
945    // Objects can be removed from this set after one release.
946    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
947    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
948    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
949    // handle that scenario correctly.
950    assert!(
951        deleted_system_objects
952            .iter()
953            // It's okay if Indexes change because they're inherently ephemeral.
954            .filter(|object| object.object_type != CatalogItemType::Index)
955            .all(
956                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
957                    || delete_exceptions.contains(deleted_object)
958            ),
959        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
960        deleted_system_objects
961    );
962    txn.drop_comments(&deleted_comments)?;
963    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
964    txn.remove_system_object_mappings(deleted_system_objects)?;
965
966    // Filter down to just the GlobalIds which are used to track the underlying collections.
967    let new_builtin_collections = new_builtin_ids
968        .into_iter()
969        .map(|(_catalog_id, global_id)| global_id)
970        .collect();
971
972    Ok(new_builtin_collections)
973}
974
975fn add_new_remove_old_builtin_clusters_migration(
976    txn: &mut mz_catalog::durable::Transaction<'_>,
977    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
978    boot_ts: Timestamp,
979) -> Result<(), mz_catalog::durable::CatalogError> {
980    let mut durable_clusters: BTreeMap<_, _> = txn
981        .get_clusters()
982        .filter(|cluster| cluster.id.is_system())
983        .map(|cluster| (cluster.name.to_string(), cluster))
984        .collect();
985
986    // Add new clusters.
987    for builtin_cluster in BUILTIN_CLUSTERS {
988        if durable_clusters.remove(builtin_cluster.name).is_none() {
989            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
990
991            let cluster_id = txn.insert_system_cluster(
992                builtin_cluster.name,
993                vec![],
994                builtin_cluster.privileges.to_vec(),
995                builtin_cluster.owner_id.to_owned(),
996                mz_catalog::durable::ClusterConfig {
997                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
998                        size: cluster_config.size,
999                        availability_zones: vec![],
1000                        replication_factor: cluster_config.replication_factor,
1001                        logging: default_logging_config(),
1002                        optimizer_feature_overrides: Default::default(),
1003                        schedule: Default::default(),
1004                    }),
1005                    workload_class: None,
1006                },
1007                &HashSet::new(),
1008            )?;
1009
1010            let audit_id = txn.allocate_audit_log_id()?;
1011            txn.insert_audit_log_event(VersionedEvent::new(
1012                audit_id,
1013                EventType::Create,
1014                ObjectType::Cluster,
1015                EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1016                    id: cluster_id.to_string(),
1017                    name: builtin_cluster.name.to_string(),
1018                }),
1019                None,
1020                boot_ts.into(),
1021            ));
1022        }
1023    }
1024
1025    // Remove old clusters.
1026    let old_clusters = durable_clusters
1027        .values()
1028        .map(|cluster| cluster.id)
1029        .collect();
1030    txn.remove_clusters(&old_clusters)?;
1031
1032    for (_name, cluster) in &durable_clusters {
1033        let audit_id = txn.allocate_audit_log_id()?;
1034        txn.insert_audit_log_event(VersionedEvent::new(
1035            audit_id,
1036            EventType::Drop,
1037            ObjectType::Cluster,
1038            EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1039                id: cluster.id.to_string(),
1040                name: cluster.name.clone(),
1041            }),
1042            None,
1043            boot_ts.into(),
1044        ));
1045    }
1046
1047    Ok(())
1048}
1049
1050fn add_new_remove_old_builtin_introspection_source_migration(
1051    txn: &mut mz_catalog::durable::Transaction<'_>,
1052) -> Result<(), AdapterError> {
1053    let mut new_indexes = Vec::new();
1054    let mut removed_indexes = BTreeSet::new();
1055    for cluster in txn.get_clusters() {
1056        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1057
1058        let mut new_logs = Vec::new();
1059
1060        for log in BUILTINS::logs() {
1061            if introspection_source_index_ids.remove(log.name).is_none() {
1062                new_logs.push(log);
1063            }
1064        }
1065
1066        for log in new_logs {
1067            let (item_id, gid) =
1068                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1069            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1070        }
1071
1072        // Anything left in `introspection_source_index_ids` must have been deleted and should be
1073        // removed from the catalog.
1074        removed_indexes.extend(
1075            introspection_source_index_ids
1076                .into_keys()
1077                .map(|name| (cluster.id, name.to_string())),
1078        );
1079    }
1080    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1081    txn.remove_introspection_source_indexes(removed_indexes)?;
1082    Ok(())
1083}
1084
1085fn add_new_remove_old_builtin_roles_migration(
1086    txn: &mut mz_catalog::durable::Transaction<'_>,
1087) -> Result<(), mz_catalog::durable::CatalogError> {
1088    let mut durable_roles: BTreeMap<_, _> = txn
1089        .get_roles()
1090        .filter(|role| role.id.is_system() || role.id.is_predefined())
1091        .map(|role| (role.name.to_string(), role))
1092        .collect();
1093
1094    // Add new roles.
1095    for builtin_role in BUILTIN_ROLES {
1096        if durable_roles.remove(builtin_role.name).is_none() {
1097            txn.insert_builtin_role(
1098                builtin_role.id,
1099                builtin_role.name.to_string(),
1100                builtin_role.attributes.clone(),
1101                RoleMembership::new(),
1102                RoleVars::default(),
1103                builtin_role.oid,
1104            )?;
1105        }
1106    }
1107
1108    // Remove old roles.
1109    let old_roles = durable_roles.values().map(|role| role.id).collect();
1110    txn.remove_roles(&old_roles)?;
1111
1112    Ok(())
1113}
1114
1115fn add_new_remove_old_builtin_cluster_replicas_migration(
1116    txn: &mut Transaction<'_>,
1117    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1118    boot_ts: Timestamp,
1119) -> Result<(), AdapterError> {
1120    let cluster_lookup: BTreeMap<_, _> = txn
1121        .get_clusters()
1122        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1123        .collect();
1124
1125    let cluster_id_to_name: BTreeMap<ClusterId, String> = cluster_lookup
1126        .values()
1127        .map(|cluster| (cluster.id, cluster.name.clone()))
1128        .collect();
1129
1130    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1131        .get_cluster_replicas()
1132        .filter(|replica| replica.replica_id.is_system())
1133        .fold(BTreeMap::new(), |mut acc, replica| {
1134            acc.entry(replica.cluster_id)
1135                .or_insert_with(BTreeMap::new)
1136                .insert(replica.name.to_string(), replica);
1137            acc
1138        });
1139
1140    // Add new replicas.
1141    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1142        let cluster = cluster_lookup
1143            .get(builtin_replica.cluster_name)
1144            .expect("builtin cluster replica references non-existent cluster");
1145        // `empty_map` is a hack to simplify the if statement below.
1146        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1147        let replica_names = durable_replicas
1148            .get_mut(&cluster.id)
1149            .unwrap_or(&mut empty_map);
1150
1151        let builtin_cluster_bootstrap_config =
1152            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1153        if replica_names.remove(builtin_replica.name).is_none()
1154            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1155            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1156            // factor is configurable on bootstrap.
1157            && builtin_cluster_bootstrap_config.replication_factor > 0
1158        {
1159            let replica_size = match cluster.config.variant {
1160                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1161                ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size.clone(),
1162            };
1163
1164            let config = builtin_cluster_replica_config(replica_size.clone());
1165            let replica_id = txn.insert_cluster_replica(
1166                cluster.id,
1167                builtin_replica.name,
1168                config,
1169                MZ_SYSTEM_ROLE_ID,
1170            )?;
1171
1172            let audit_id = txn.allocate_audit_log_id()?;
1173            txn.insert_audit_log_event(VersionedEvent::new(
1174                audit_id,
1175                EventType::Create,
1176                ObjectType::ClusterReplica,
1177                EventDetails::CreateClusterReplicaV4(mz_audit_log::CreateClusterReplicaV4 {
1178                    cluster_id: cluster.id.to_string(),
1179                    cluster_name: cluster.name.clone(),
1180                    replica_id: Some(replica_id.to_string()),
1181                    replica_name: builtin_replica.name.to_string(),
1182                    logical_size: replica_size,
1183                    billed_as: None,
1184                    internal: false,
1185                    reason: CreateOrDropClusterReplicaReasonV1::System,
1186                    scheduling_policies: None,
1187                }),
1188                None,
1189                boot_ts.into(),
1190            ));
1191        }
1192    }
1193
1194    // Remove old replicas.
1195    let old_replicas: Vec<_> = durable_replicas
1196        .values()
1197        .flat_map(|replicas| replicas.values())
1198        .collect();
1199    let old_replica_ids = old_replicas.iter().map(|r| r.replica_id).collect();
1200    txn.remove_cluster_replicas(&old_replica_ids)?;
1201
1202    for replica in &old_replicas {
1203        let cluster_name = cluster_id_to_name
1204            .get(&replica.cluster_id)
1205            .cloned()
1206            .unwrap_or_else(|| "<unknown>".to_string());
1207
1208        let audit_id = txn.allocate_audit_log_id()?;
1209        txn.insert_audit_log_event(VersionedEvent::new(
1210            audit_id,
1211            EventType::Drop,
1212            ObjectType::ClusterReplica,
1213            EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1214                cluster_id: replica.cluster_id.to_string(),
1215                cluster_name,
1216                replica_id: Some(replica.replica_id.to_string()),
1217                replica_name: replica.name.clone(),
1218                reason: CreateOrDropClusterReplicaReasonV1::System,
1219                scheduling_policies: None,
1220            }),
1221            None,
1222            boot_ts.into(),
1223        ));
1224    }
1225
1226    Ok(())
1227}
1228
1229/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1230/// the 'cluster' parameter.
1231///
1232/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1233/// input is no longer valid. For example if we remove a configuration parameter or change the
1234/// accepted set of values.
1235fn remove_invalid_config_param_role_defaults_migration(
1236    txn: &mut Transaction<'_>,
1237) -> Result<(), AdapterError> {
1238    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1239
1240    let roles_to_migrate: BTreeMap<_, _> = txn
1241        .get_roles()
1242        .filter_map(|mut role| {
1243            // Create an empty SessionVars just so we can check if a var is valid.
1244            //
1245            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1246            // session variables.
1247            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1248
1249            // Iterate over all of the variable defaults for this role.
1250            let mut invalid_roles_vars = BTreeMap::new();
1251            for (name, value) in &role.vars.map {
1252                // If one does not exist or its value is invalid, then mark it for removal.
1253                let Ok(session_var) = session_vars.inspect(name) else {
1254                    invalid_roles_vars.insert(name.clone(), value.clone());
1255                    continue;
1256                };
1257                if session_var.check(value.borrow()).is_err() {
1258                    invalid_roles_vars.insert(name.clone(), value.clone());
1259                }
1260            }
1261
1262            // If the role has no invalid values, nothing to do!
1263            if invalid_roles_vars.is_empty() {
1264                return None;
1265            }
1266
1267            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1268
1269            // Otherwise, remove the variables from the role and return it to be updated.
1270            for (name, _value) in invalid_roles_vars {
1271                role.vars.map.remove(&name);
1272            }
1273            Some(role)
1274        })
1275        .map(|role| (role.id, role))
1276        .collect();
1277
1278    txn.update_roles_without_auth(roles_to_migrate)?;
1279
1280    Ok(())
1281}
1282
1283/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1284/// are marked as pending and should be cleaned up on catalog open.
1285fn remove_pending_cluster_replicas_migration(
1286    tx: &mut Transaction,
1287    boot_ts: mz_repr::Timestamp,
1288) -> Result<(), anyhow::Error> {
1289    // Build a map of cluster_id -> cluster_name for audit events.
1290    let cluster_names: BTreeMap<_, _> = tx.get_clusters().map(|c| (c.id, c.name)).collect();
1291
1292    let occurred_at = boot_ts.into();
1293
1294    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1295        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1296            replica.config.location
1297        {
1298            let cluster_name = cluster_names
1299                .get(&replica.cluster_id)
1300                .cloned()
1301                .unwrap_or_else(|| "<unknown>".to_string());
1302
1303            info!(
1304                "removing pending cluster replica '{}' from cluster '{}'",
1305                replica.name, cluster_name,
1306            );
1307
1308            tx.remove_cluster_replica(replica.replica_id)?;
1309
1310            // Emit an audit log event so that the drop is visible in
1311            // mz_audit_events, matching the create event that was
1312            // recorded when the pending replica was first created.
1313            let audit_id = tx.allocate_audit_log_id()?;
1314            tx.insert_audit_log_event(VersionedEvent::new(
1315                audit_id,
1316                EventType::Drop,
1317                ObjectType::ClusterReplica,
1318                EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1319                    cluster_id: replica.cluster_id.to_string(),
1320                    cluster_name,
1321                    replica_id: Some(replica.replica_id.to_string()),
1322                    replica_name: replica.name,
1323                    reason: CreateOrDropClusterReplicaReasonV1::System,
1324                    scheduling_policies: None,
1325                }),
1326                None,
1327                occurred_at,
1328            ));
1329        }
1330    }
1331    Ok(())
1332}
1333
1334pub(crate) fn builtin_cluster_replica_config(
1335    replica_size: String,
1336) -> mz_catalog::durable::ReplicaConfig {
1337    mz_catalog::durable::ReplicaConfig {
1338        location: mz_catalog::durable::ReplicaLocation::Managed {
1339            availability_zone: None,
1340            billed_as: None,
1341            pending: false,
1342            internal: false,
1343            size: replica_size,
1344        },
1345        logging: default_logging_config(),
1346    }
1347}
1348
1349fn default_logging_config() -> ReplicaLogging {
1350    ReplicaLogging {
1351        log_logging: false,
1352        interval: Some(Duration::from_secs(1)),
1353    }
1354}
1355
1356#[derive(Debug)]
1357pub struct BuiltinBootstrapClusterConfigMap {
1358    /// Size and replication factor to default system_cluster on bootstrap
1359    pub system_cluster: BootstrapBuiltinClusterConfig,
1360    /// Size and replication factor to default catalog_server_cluster on bootstrap
1361    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1362    /// Size and replication factor to default probe_cluster on bootstrap
1363    pub probe_cluster: BootstrapBuiltinClusterConfig,
1364    /// Size and replication factor to default support_cluster on bootstrap
1365    pub support_cluster: BootstrapBuiltinClusterConfig,
1366    /// Size to default analytics_cluster on bootstrap
1367    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1368}
1369
1370impl BuiltinBootstrapClusterConfigMap {
1371    /// Gets the size of the builtin cluster based on the provided name
1372    fn get_config(
1373        &self,
1374        cluster_name: &str,
1375    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1376        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1377            &self.system_cluster
1378        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1379            &self.catalog_server_cluster
1380        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1381            &self.probe_cluster
1382        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1383            &self.support_cluster
1384        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1385            &self.analytics_cluster
1386        } else {
1387            return Err(mz_catalog::durable::CatalogError::Catalog(
1388                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1389            ));
1390        };
1391        Ok(cluster_config.clone())
1392    }
1393}
1394
1395/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1396///
1397///   - Convert each update into a type that implements [`std::cmp::Ord`].
1398///   - Update the timestamp of each update to the same value.
1399///   - Convert the diff of each update to a type that implements
1400///     [`differential_dataflow::difference::Semigroup`].
1401///
1402/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1403/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1404/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1405/// and are only created after bootstrap is complete. So we forcibly convert each
1406/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1407/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1408/// temporary item variant and does implement [`std::cmp::Ord`].
1409///
1410/// WARNING: Do not call outside of startup.
1411pub(crate) fn into_consolidatable_updates_startup(
1412    updates: Vec<StateUpdate>,
1413    ts: Timestamp,
1414) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1415    updates
1416        .into_iter()
1417        .map(|StateUpdate { kind, ts: _, diff }| {
1418            let kind: BootstrapStateUpdateKind = kind
1419                .try_into()
1420                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1421            (kind, ts, Diff::from(diff))
1422        })
1423        .collect()
1424}
1425
1426fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1427    defaults: &BTreeMap<String, String>,
1428    remote: Option<&BTreeMap<String, String>>,
1429    cfg: &mz_dyncfg::Config<T>,
1430) -> T::ConfigType {
1431    let mut val = T::into_config_type(cfg.default().clone());
1432    let get_fn = |map: &BTreeMap<String, String>| {
1433        let val = map.get(cfg.name())?;
1434        match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1435            Ok(x) => Some(x),
1436            Err(err) => {
1437                tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1438                None
1439            }
1440        }
1441    };
1442    if let Some(x) = get_fn(defaults) {
1443        val = x;
1444    }
1445    if let Some(x) = remote.and_then(get_fn) {
1446        val = x;
1447    }
1448    val
1449}