Skip to main content

mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
23use mz_audit_log::{
24    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, ObjectType, VersionedEvent,
25};
26use mz_auth::hash::scram256_hash;
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::{
29    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
30    Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
31};
32use mz_catalog::config::StateConfig;
33use mz_catalog::durable::objects::{
34    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
35};
36use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
37use mz_catalog::expr_cache::{
38    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
39};
40use mz_catalog::memory::error::{Error, ErrorKind};
41use mz_catalog::memory::objects::{
42    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
43};
44use mz_controller::clusters::ReplicaLogging;
45use mz_controller_types::ClusterId;
46use mz_ore::cast::usize_to_u64;
47use mz_ore::collections::HashSet;
48use mz_ore::now::{SYSTEM_TIME, to_datetime};
49use mz_ore::{instrument, soft_assert_no_log};
50use mz_repr::adt::mz_acl_item::PrivilegeMap;
51use mz_repr::namespaces::is_unstable_schema;
52use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
53use mz_sql::catalog::{
54    BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
55};
56use mz_sql::func::OP_IMPLS;
57use mz_sql::names::CommentObjectId;
58use mz_sql::rbac;
59use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
60use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
61use mz_storage_client::controller::{StorageMetadata, StorageTxn};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{Instrument, info, warn};
64use uuid::Uuid;
65
66// DO NOT add any more imports from `crate` outside of `crate::catalog`.
67use crate::AdapterError;
68use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71    BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name,
72};
73
74pub struct InitializeStateResult {
75    /// An initialized [`CatalogState`].
76    pub state: CatalogState,
77    /// A set of new shards that may need to be initialized (only used by 0dt migration).
78    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
79    /// A set of new builtin items.
80    pub new_builtin_collections: BTreeSet<GlobalId>,
81    /// A list of builtin table updates corresponding to the initialized state.
82    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
83    /// The version of the catalog that existed before initializing the catalog.
84    pub last_seen_version: String,
85    /// A handle to the expression cache if it's enabled.
86    pub expr_cache_handle: Option<ExpressionCacheHandle>,
87    /// The global expressions that were cached in `expr_cache_handle`.
88    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
89    /// The local expressions that were NOT cached in `expr_cache_handle`.
90    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
91}
92
93pub struct OpenCatalogResult {
94    /// An opened [`Catalog`].
95    pub catalog: Catalog,
96    /// A set of new shards that may need to be initialized.
97    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
98    /// A set of new builtin items.
99    pub new_builtin_collections: BTreeSet<GlobalId>,
100    /// A list of builtin table updates corresponding to the initialized state.
101    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
102    /// The global expressions that were cached in the expression cache.
103    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
104    /// The local expressions that were NOT cached in the expression cache.
105    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
106}
107
108impl Catalog {
109    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
110    /// external to a [mz_catalog::durable::DurableCatalogState]
111    /// (for example: no [mz_secrets::SecretsReader]).
112    pub async fn initialize_state<'a>(
113        config: StateConfig,
114        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
115    ) -> Result<InitializeStateResult, AdapterError> {
116        for builtin_role in BUILTIN_ROLES {
117            assert!(
118                is_reserved_name(builtin_role.name),
119                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
120                BUILTIN_PREFIXES.join(", ")
121            );
122        }
123        for builtin_cluster in BUILTIN_CLUSTERS {
124            assert!(
125                is_reserved_name(builtin_cluster.name),
126                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
127                BUILTIN_PREFIXES.join(", ")
128            );
129        }
130
131        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
132        if config.all_features {
133            system_configuration.enable_all_feature_flags_by_default();
134        }
135
136        let mut state = CatalogState {
137            database_by_name: imbl::OrdMap::new(),
138            database_by_id: imbl::OrdMap::new(),
139            entry_by_id: imbl::OrdMap::new(),
140            entry_by_global_id: imbl::OrdMap::new(),
141            ambient_schemas_by_name: imbl::OrdMap::new(),
142            ambient_schemas_by_id: imbl::OrdMap::new(),
143            clusters_by_name: imbl::OrdMap::new(),
144            clusters_by_id: imbl::OrdMap::new(),
145            roles_by_name: imbl::OrdMap::new(),
146            roles_by_id: imbl::OrdMap::new(),
147            network_policies_by_id: imbl::OrdMap::new(),
148            role_auth_by_id: imbl::OrdMap::new(),
149            network_policies_by_name: imbl::OrdMap::new(),
150            system_configuration: Arc::new(system_configuration),
151            default_privileges: Arc::new(DefaultPrivileges::default()),
152            system_privileges: Arc::new(PrivilegeMap::default()),
153            comments: Arc::new(CommentsMap::default()),
154            source_references: imbl::OrdMap::new(),
155            storage_metadata: Arc::new(StorageMetadata::default()),
156            temporary_schemas: imbl::OrdMap::new(),
157            mock_authentication_nonce: Default::default(),
158            config: mz_sql::catalog::CatalogConfig {
159                start_time: to_datetime((config.now)()),
160                start_instant: Instant::now(),
161                nonce: rand::random(),
162                environment_id: config.environment_id,
163                session_id: Uuid::new_v4(),
164                build_info: config.build_info,
165                now: config.now.clone(),
166                connection_context: config.connection_context,
167                builtins_cfg: BuiltinsConfig {
168                    // This will fall back to the default in code (false) if we timeout on the
169                    // initial LD sync. We're just using this to get some additional testing in on
170                    // CTs so a false negative is fine, we're only worried about false positives.
171                    include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
172                        &config.system_parameter_defaults,
173                        config.remote_system_parameters.as_ref(),
174                        &ENABLE_CONTINUAL_TASK_BUILTINS,
175                    ),
176                },
177                helm_chart_version: config.helm_chart_version,
178            },
179            cluster_replica_sizes: config.cluster_replica_sizes,
180            availability_zones: config.availability_zones,
181            egress_addresses: config.egress_addresses,
182            aws_principal_context: config.aws_principal_context,
183            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
184            http_host_name: config.http_host_name,
185            license_key: config.license_key,
186        };
187
188        let deploy_generation = storage.get_deployment_generation().await?;
189
190        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
191        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
192        let mut txn = storage.transaction().await?;
193
194        // Migrate/update durable data before we start loading the in-memory catalog.
195        let new_builtin_collections = {
196            migrate::durable_migrate(
197                &mut txn,
198                state.config.environment_id.organization_id(),
199                config.boot_ts,
200            )?;
201            // Overwrite and persist selected parameter values in `remote_system_parameters` that
202            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
203            if let Some(remote_system_parameters) = config.remote_system_parameters {
204                for (name, value) in remote_system_parameters {
205                    txn.upsert_system_config(&name, value)?;
206                }
207                txn.set_system_config_synced_once()?;
208            }
209            // Add any new builtin objects and remove old ones.
210            let new_builtin_collections =
211                add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
212            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
213                system_cluster: config.builtin_system_cluster_config,
214                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
215                probe_cluster: config.builtin_probe_cluster_config,
216                support_cluster: config.builtin_support_cluster_config,
217                analytics_cluster: config.builtin_analytics_cluster_config,
218            };
219            add_new_remove_old_builtin_clusters_migration(
220                &mut txn,
221                &builtin_bootstrap_cluster_config_map,
222                config.boot_ts,
223            )?;
224            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
225            add_new_remove_old_builtin_cluster_replicas_migration(
226                &mut txn,
227                &builtin_bootstrap_cluster_config_map,
228                config.boot_ts,
229            )?;
230            add_new_remove_old_builtin_roles_migration(&mut txn)?;
231            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
232            remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
233
234            new_builtin_collections
235        };
236
237        let op_updates = txn.get_and_commit_op_updates();
238        updates.extend(op_updates);
239
240        let mut builtin_table_updates = Vec::new();
241
242        // Seed the in-memory catalog with values that don't come from the durable catalog.
243        {
244            // Set defaults from configuration passed in the provided `system_parameter_defaults`
245            // map.
246            for (name, value) in config.system_parameter_defaults {
247                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
248                    Ok(_) => (),
249                    Err(Error {
250                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
251                    }) => {
252                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
253                    }
254                    Err(e) => return Err(e.into()),
255                };
256            }
257            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
258        }
259
260        // Make life easier by consolidating all updates, so that we end up with only positive
261        // diffs.
262        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
263        differential_dataflow::consolidation::consolidate_updates(&mut updates);
264        soft_assert_no_log!(
265            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
266            "consolidated updates should be positive during startup: {updates:?}"
267        );
268
269        let mut pre_item_updates = Vec::new();
270        let mut system_item_updates = Vec::new();
271        let mut item_updates = Vec::new();
272        let mut post_item_updates = Vec::new();
273        let mut audit_log_updates = Vec::new();
274        for (kind, ts, diff) in updates {
275            match kind {
276                BootstrapStateUpdateKind::Role(_)
277                | BootstrapStateUpdateKind::RoleAuth(_)
278                | BootstrapStateUpdateKind::Database(_)
279                | BootstrapStateUpdateKind::Schema(_)
280                | BootstrapStateUpdateKind::DefaultPrivilege(_)
281                | BootstrapStateUpdateKind::SystemPrivilege(_)
282                | BootstrapStateUpdateKind::SystemConfiguration(_)
283                | BootstrapStateUpdateKind::Cluster(_)
284                | BootstrapStateUpdateKind::NetworkPolicy(_)
285                | BootstrapStateUpdateKind::ClusterReplica(_) => {
286                    pre_item_updates.push(StateUpdate {
287                        kind: kind.into(),
288                        ts,
289                        diff: diff.try_into().expect("valid diff"),
290                    })
291                }
292                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
293                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
294                    system_item_updates.push(StateUpdate {
295                        kind: kind.into(),
296                        ts,
297                        diff: diff.try_into().expect("valid diff"),
298                    })
299                }
300                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
301                    kind: kind.into(),
302                    ts,
303                    diff: diff.try_into().expect("valid diff"),
304                }),
305                BootstrapStateUpdateKind::Comment(_)
306                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
307                | BootstrapStateUpdateKind::SourceReferences(_)
308                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
309                    post_item_updates.push((kind, ts, diff));
310                }
311                BootstrapStateUpdateKind::AuditLog(_) => {
312                    audit_log_updates.push(StateUpdate {
313                        kind: kind.into(),
314                        ts,
315                        diff: diff.try_into().expect("valid diff"),
316                    });
317                }
318            }
319        }
320
321        let (builtin_table_update, _catalog_updates) = state
322            .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
323            .await;
324        builtin_table_updates.extend(builtin_table_update);
325
326        // Ensure mz_system has a password if configured to have one.
327        // It's important we do this after the `pre_item_updates` so that
328        // the mz_system role exists in the catalog.
329        {
330            if let Some(password) = config.external_login_password_mz_system {
331                let role_auth = RoleAuth {
332                    role_id: MZ_SYSTEM_ROLE_ID,
333                    // builtin roles should always use a secure scram iteration
334                    // <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
335                    password_hash: Some(
336                        scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
337                            .map_err(|_| {
338                            AdapterError::Internal("Failed to hash mz_system password.".to_owned())
339                        })?,
340                    ),
341                    updated_at: SYSTEM_TIME(),
342                };
343                state
344                    .role_auth_by_id
345                    .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
346                let builtin_table_update = state.generate_builtin_table_update(
347                    mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
348                    mz_catalog::memory::objects::StateDiff::Addition,
349                );
350                builtin_table_updates.extend(builtin_table_update);
351            }
352        }
353
354        let expr_cache_start = Instant::now();
355        info!("startup: coordinator init: catalog open: expr cache open beginning");
356        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
357        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
358        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
359        let expr_cache_enabled = config
360            .enable_expression_cache_override
361            .unwrap_or(enable_expr_cache_dyncfg);
362        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
363            info!(
364                ?config.enable_expression_cache_override,
365                ?enable_expr_cache_dyncfg,
366                "using expression cache for startup"
367            );
368            let current_ids = txn
369                .get_items()
370                .flat_map(|item| {
371                    let gid = item.global_id.clone();
372                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
373                    std::iter::once(gid).chain(gids.into_iter())
374                })
375                .chain(
376                    txn.get_system_object_mappings()
377                        .map(|som| som.unique_identifier.global_id),
378                )
379                .collect();
380            let dyncfgs = config.persist_client.dyncfgs().clone();
381            let build_version = if config.build_info.is_dev() {
382                // A single dev version can be used for many different builds, so we need to use
383                // the build version that is also enriched with build metadata.
384                config
385                    .build_info
386                    .semver_version_build()
387                    .expect("build ID is not available on your platform!")
388            } else {
389                config.build_info.semver_version()
390            };
391            let expr_cache_config = ExpressionCacheConfig {
392                build_version,
393                shard_id: txn
394                    .get_expression_cache_shard()
395                    .expect("expression cache shard should exist for opened catalogs"),
396                persist: config.persist_client,
397                current_ids,
398                remove_prior_versions: !config.read_only,
399                compact_shard: config.read_only,
400                dyncfgs,
401            };
402            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
403                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
404            (
405                Some(expr_cache_handle),
406                cached_local_exprs,
407                cached_global_exprs,
408            )
409        } else {
410            (None, BTreeMap::new(), BTreeMap::new())
411        };
412        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
413        info!(
414            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
415            expr_cache_start.elapsed()
416        );
417
418        // When initializing/bootstrapping, we don't use the catalog updates but
419        // instead load the catalog fully and then go ahead and apply commands
420        // to the controller(s). Maybe we _should_ instead use the same logic
421        // and return and use the updates from here. But that's at the very
422        // least future work.
423        let (builtin_table_update, _catalog_updates) = state
424            .apply_updates(system_item_updates, &mut local_expr_cache)
425            .await;
426        builtin_table_updates.extend(builtin_table_update);
427
428        let last_seen_version =
429            get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
430
431        let mz_authentication_mock_nonce =
432            txn.get_authentication_mock_nonce().ok_or_else(|| {
433                Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
434            })?;
435
436        state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
437
438        // Migrate item ASTs.
439        let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
440            let migrate_result = migrate::migrate(
441                &mut state,
442                &mut txn,
443                &mut local_expr_cache,
444                item_updates,
445                config.now,
446                config.boot_ts,
447            )
448            .await
449            .map_err(|e| {
450                Error::new(ErrorKind::FailedCatalogMigration {
451                    last_seen_version: last_seen_version.clone(),
452                    this_version: config.build_info.version,
453                    cause: e.to_string(),
454                })
455            })?;
456            if !migrate_result.post_item_updates.is_empty() {
457                // Include any post-item-updates generated by migrations, and then consolidate
458                // them to ensure diffs are all positive.
459                post_item_updates.extend(migrate_result.post_item_updates);
460                // Push everything to the same timestamp so it consolidates cleanly.
461                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
462                    for (_, ts, _) in &mut post_item_updates {
463                        *ts = max_ts;
464                    }
465                }
466                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
467            }
468
469            (
470                migrate_result.builtin_table_updates,
471                migrate_result.catalog_updates,
472            )
473        } else {
474            state
475                .apply_updates(item_updates, &mut local_expr_cache)
476                .await
477        };
478        builtin_table_updates.extend(builtin_table_update);
479
480        let post_item_updates = post_item_updates
481            .into_iter()
482            .map(|(kind, ts, diff)| StateUpdate {
483                kind: kind.into(),
484                ts,
485                diff: diff.try_into().expect("valid diff"),
486            })
487            .collect();
488        let (builtin_table_update, _catalog_updates) = state
489            .apply_updates(post_item_updates, &mut local_expr_cache)
490            .await;
491        builtin_table_updates.extend(builtin_table_update);
492
493        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
494        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
495        // updates.
496        for audit_log_update in audit_log_updates {
497            builtin_table_updates.extend(
498                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
499            );
500        }
501
502        // Migrate builtin items.
503        let schema_migration_result = builtin_schema_migration::run(
504            config.build_info,
505            deploy_generation,
506            &mut txn,
507            config.builtin_item_migration_config,
508        )
509        .await?;
510
511        let state_updates = txn.get_and_commit_op_updates();
512
513        // When initializing/bootstrapping, we don't use the catalog updates but
514        // instead load the catalog fully and then go ahead and apply commands
515        // to the controller(s). Maybe we _should_ instead use the same logic
516        // and return and use the updates from here. But that's at the very
517        // least future work.
518        let (table_updates, _catalog_updates) = state
519            .apply_updates(state_updates, &mut local_expr_cache)
520            .await;
521        builtin_table_updates.extend(table_updates);
522        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
523
524        // Bump the migration version immediately before committing.
525        set_migration_version(&mut txn, config.build_info.semver_version())?;
526
527        txn.commit(config.boot_ts).await?;
528
529        // Now that the migration is durable, run any requested deferred cleanup.
530        schema_migration_result.cleanup_action.await;
531
532        Ok(InitializeStateResult {
533            state,
534            migrated_storage_collections_0dt: schema_migration_result.replaced_items,
535            new_builtin_collections: new_builtin_collections.into_iter().collect(),
536            builtin_table_updates,
537            last_seen_version,
538            expr_cache_handle,
539            cached_global_exprs,
540            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
541        })
542    }
543
544    /// Opens or creates a catalog that stores data at `path`.
545    ///
546    /// Returns the catalog, metadata about builtin objects that have changed
547    /// schemas since last restart, a list of updates to builtin tables that
548    /// describe the initial state of the catalog, and the version of the
549    /// catalog before any migrations were performed.
550    ///
551    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
552    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
553    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
554    #[instrument(name = "catalog::open")]
555    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
556        async move {
557            let mut storage = config.storage;
558
559            let InitializeStateResult {
560                state,
561                migrated_storage_collections_0dt,
562                new_builtin_collections,
563                mut builtin_table_updates,
564                last_seen_version: _,
565                expr_cache_handle,
566                cached_global_exprs,
567                uncached_local_exprs,
568            } =
569                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
570                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
571                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
572                Self::initialize_state(config.state, &mut storage)
573                    .instrument(tracing::info_span!("catalog::initialize_state"))
574                    .boxed()
575                    .await?;
576
577            let catalog = Catalog {
578                state,
579                plans: CatalogPlans::default(),
580                expr_cache_handle,
581                transient_revision: 1,
582                storage: Arc::new(tokio::sync::Mutex::new(storage)),
583            };
584
585            // Operators aren't stored in the catalog, but we would like them in
586            // introspection views.
587            for (op, func) in OP_IMPLS.iter() {
588                match func {
589                    mz_sql::func::Func::Scalar(impls) => {
590                        for imp in impls {
591                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
592                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
593                            ));
594                        }
595                    }
596                    _ => unreachable!("all operators must be scalar functions"),
597                }
598            }
599
600            for ip in &catalog.state.egress_addresses {
601                builtin_table_updates.push(
602                    catalog
603                        .state
604                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
605                );
606            }
607
608            if !catalog.state.license_key.id.is_empty() {
609                builtin_table_updates.push(
610                    catalog.state.resolve_builtin_table_update(
611                        catalog
612                            .state
613                            .pack_license_key_update(&catalog.state.license_key)?,
614                    ),
615                );
616            }
617
618            catalog.storage().await.mark_bootstrap_complete().await;
619
620            Ok(OpenCatalogResult {
621                catalog,
622                migrated_storage_collections_0dt,
623                new_builtin_collections,
624                builtin_table_updates,
625                cached_global_exprs,
626                uncached_local_exprs,
627            })
628        }
629        .instrument(tracing::info_span!("catalog::open"))
630        .boxed()
631    }
632
633    /// Initializes STORAGE to understand all shards that `self` expects to
634    /// exist.
635    ///
636    /// Note that this must be done before creating/rendering collections
637    /// because the storage controller might not be aware of new system
638    /// collections created between versions.
639    async fn initialize_storage_state(
640        &mut self,
641        storage_collections: &Arc<
642            dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
643        >,
644    ) -> Result<(), mz_catalog::durable::CatalogError> {
645        let collections = self
646            .entries()
647            .filter(|entry| entry.item().is_storage_collection())
648            .flat_map(|entry| entry.global_ids())
649            .collect();
650
651        // Clone the state so that any errors that occur do not leak any
652        // transformations on error.
653        let mut state = self.state.clone();
654
655        let mut storage = self.storage().await;
656        let shard_id = storage.shard_id();
657        let mut txn = storage.transaction().await?;
658
659        // Ensure the storage controller knows about the catalog shard and associates it with the
660        // `MZ_CATALOG_RAW` builtin source.
661        let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
662        let global_id = self.get_entry(&item_id).latest_global_id();
663        match txn.get_collection_metadata().get(&global_id) {
664            None => {
665                txn.insert_collection_metadata([(global_id, shard_id)].into())
666                    .map_err(mz_catalog::durable::DurableCatalogError::from)?;
667            }
668            Some(id) => assert_eq!(*id, shard_id),
669        }
670
671        storage_collections
672            .initialize_state(&mut txn, collections)
673            .await
674            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
675
676        let updates = txn.get_and_commit_op_updates();
677        let (builtin_updates, catalog_updates) = state
678            .apply_updates(updates, &mut LocalExpressionCache::Closed)
679            .await;
680        assert!(
681            builtin_updates.is_empty(),
682            "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
683        );
684        assert!(
685            catalog_updates.is_empty(),
686            "storage is not allowed to generate catalog changes that would change the catalog or controller state"
687        );
688        let commit_ts = txn.upper();
689        txn.commit(commit_ts).await?;
690        drop(storage);
691
692        // Save updated state.
693        self.state = state;
694        Ok(())
695    }
696
697    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
698    /// so make it available and initialize the controller.
699    pub async fn initialize_controller(
700        &mut self,
701        config: mz_controller::ControllerConfig,
702        envd_epoch: core::num::NonZeroI64,
703        read_only: bool,
704    ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
705    {
706        let controller_start = Instant::now();
707        info!("startup: controller init: beginning");
708
709        let controller = {
710            let mut storage = self.storage().await;
711            let mut tx = storage.transaction().await?;
712            mz_controller::prepare_initialization(&mut tx)
713                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
714            let updates = tx.get_and_commit_op_updates();
715            assert!(
716                updates.is_empty(),
717                "initializing controller should not produce updates: {updates:?}"
718            );
719            let commit_ts = tx.upper();
720            tx.commit(commit_ts).await?;
721
722            let read_only_tx = storage.transaction().await?;
723
724            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
725        };
726
727        self.initialize_storage_state(&controller.storage_collections)
728            .await?;
729
730        info!(
731            "startup: controller init: complete in {:?}",
732            controller_start.elapsed()
733        );
734
735        Ok(controller)
736    }
737
738    /// Politely releases all external resources that can only be released in an async context.
739    pub async fn expire(self) {
740        // If no one else holds a reference to storage, then clean up the storage resources.
741        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
742        if let Some(storage) = Arc::into_inner(self.storage) {
743            let storage = storage.into_inner();
744            storage.expire().await;
745        }
746    }
747}
748
749impl CatalogState {
750    /// Set the default value for `name`, which is the value it will be reset to.
751    fn set_system_configuration_default(
752        &mut self,
753        name: &str,
754        value: VarInput,
755    ) -> Result<(), Error> {
756        Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
757    }
758}
759
760/// Updates the catalog with new and removed builtin items.
761///
762/// Returns the list of new builtin [`GlobalId`]s.
763fn add_new_remove_old_builtin_items_migration(
764    builtins_cfg: &BuiltinsConfig,
765    txn: &mut mz_catalog::durable::Transaction<'_>,
766) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
767    let mut new_builtin_mappings = Vec::new();
768    // Used to validate unique descriptions.
769    let mut builtin_descs = HashSet::new();
770
771    // We compare the builtin items that are compiled into the binary with the builtin items that
772    // are persisted in the catalog to discover new and deleted builtin items.
773    let mut builtins = Vec::new();
774    for builtin in BUILTINS::iter(builtins_cfg) {
775        let desc = SystemObjectDescription {
776            schema_name: builtin.schema().to_string(),
777            object_type: builtin.catalog_item_type(),
778            object_name: builtin.name().to_string(),
779        };
780        // Validate that the description is unique.
781        if !builtin_descs.insert(desc.clone()) {
782            panic!(
783                "duplicate builtin description: {:?}, {:?}",
784                SystemObjectDescription {
785                    schema_name: builtin.schema().to_string(),
786                    object_type: builtin.catalog_item_type(),
787                    object_name: builtin.name().to_string(),
788                },
789                builtin
790            );
791        }
792        builtins.push((desc, builtin));
793    }
794
795    let mut system_object_mappings: BTreeMap<_, _> = txn
796        .get_system_object_mappings()
797        .map(|system_object_mapping| {
798            (
799                system_object_mapping.description.clone(),
800                system_object_mapping,
801            )
802        })
803        .collect();
804
805    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
806        builtins.into_iter().partition_map(|(desc, builtin)| {
807            let fingerprint = match builtin.runtime_alterable() {
808                false => builtin.fingerprint(),
809                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
810            };
811            match system_object_mappings.remove(&desc) {
812                Some(system_object_mapping) => {
813                    Either::Left((builtin, system_object_mapping, fingerprint))
814                }
815                None => Either::Right((builtin, fingerprint)),
816            }
817        });
818    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
819    let new_builtins: Vec<_> = new_builtins
820        .into_iter()
821        .zip_eq(new_builtin_ids.clone())
822        .collect();
823
824    // Add new builtin items to catalog.
825    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
826        new_builtin_mappings.push(SystemObjectMapping {
827            description: SystemObjectDescription {
828                schema_name: builtin.schema().to_string(),
829                object_type: builtin.catalog_item_type(),
830                object_name: builtin.name().to_string(),
831            },
832            unique_identifier: SystemObjectUniqueIdentifier {
833                catalog_id,
834                global_id,
835                fingerprint,
836            },
837        });
838
839        // Runtime-alterable system objects are durably recorded to the
840        // usual items collection, so that they can be later altered at
841        // runtime by their owner (i.e., outside of the usual builtin
842        // migration framework that requires changes to the binary
843        // itself).
844        let handled_runtime_alterable = match builtin {
845            Builtin::Connection(c) if c.runtime_alterable => {
846                let mut acl_items = vec![rbac::owner_privilege(
847                    mz_sql::catalog::ObjectType::Connection,
848                    c.owner_id.clone(),
849                )];
850                acl_items.extend_from_slice(c.access);
851                // Builtin Connections cannot be versioned.
852                let versions = BTreeMap::new();
853
854                txn.insert_item(
855                    catalog_id,
856                    c.oid,
857                    global_id,
858                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
859                    c.name,
860                    c.sql.into(),
861                    *c.owner_id,
862                    acl_items,
863                    versions,
864                )?;
865                true
866            }
867            _ => false,
868        };
869        assert_eq!(
870            builtin.runtime_alterable(),
871            handled_runtime_alterable,
872            "runtime alterable object was not handled by migration",
873        );
874    }
875    txn.set_system_object_mappings(new_builtin_mappings)?;
876
877    // Update comments of all builtin objects
878    let builtins_with_catalog_ids = existing_builtins
879        .iter()
880        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
881        .chain(
882            new_builtins
883                .into_iter()
884                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
885        );
886
887    for (builtin, id) in builtins_with_catalog_ids {
888        let (comment_id, desc, comments) = match builtin {
889            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
890            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
891            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
892            Builtin::MaterializedView(mv) => (
893                CommentObjectId::MaterializedView(id),
894                &mv.desc,
895                &mv.column_comments,
896            ),
897            Builtin::Log(_)
898            | Builtin::Type(_)
899            | Builtin::Func(_)
900            | Builtin::ContinualTask(_)
901            | Builtin::Index(_)
902            | Builtin::Connection(_) => continue,
903        };
904        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
905
906        let mut comments = comments.clone();
907        for (col_idx, name) in desc.iter_names().enumerate() {
908            if let Some(comment) = comments.remove(name.as_str()) {
909                // Comment column indices are 1 based
910                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
911            }
912        }
913        assert!(
914            comments.is_empty(),
915            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
916        );
917    }
918
919    // Anything left in `system_object_mappings` must have been deleted and should be removed from
920    // the catalog.
921    let mut deleted_system_objects = BTreeSet::new();
922    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
923    let mut deleted_comments = BTreeSet::new();
924    for (desc, mapping) in system_object_mappings {
925        deleted_system_objects.insert(mapping.description);
926        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
927            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
928        }
929
930        let id = mapping.unique_identifier.catalog_id;
931        let comment_id = match desc.object_type {
932            CatalogItemType::Table => CommentObjectId::Table(id),
933            CatalogItemType::Source => CommentObjectId::Source(id),
934            CatalogItemType::View => CommentObjectId::View(id),
935            CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(id),
936            CatalogItemType::Sink
937            | CatalogItemType::Index
938            | CatalogItemType::Type
939            | CatalogItemType::Func
940            | CatalogItemType::Secret
941            | CatalogItemType::Connection
942            | CatalogItemType::ContinualTask => continue,
943        };
944        deleted_comments.insert(comment_id);
945    }
946    // If you are 100% positive that it is safe to delete a system object outside any of the
947    // unstable schemas, then add it to this set. Make sure that no prod environments are
948    // using this object and that the upgrade checker does not show any issues.
949    //
950    // Objects can be removed from this set after one release.
951    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
952    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
953    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
954    // handle that scenario correctly.
955    assert!(
956        deleted_system_objects
957            .iter()
958            // It's okay if Indexes change because they're inherently ephemeral.
959            .filter(|object| object.object_type != CatalogItemType::Index)
960            .all(
961                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
962                    || delete_exceptions.contains(deleted_object)
963            ),
964        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
965        deleted_system_objects
966    );
967    txn.drop_comments(&deleted_comments)?;
968    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
969    txn.remove_system_object_mappings(deleted_system_objects)?;
970
971    // Filter down to just the GlobalIds which are used to track the underlying collections.
972    let new_builtin_collections = new_builtin_ids
973        .into_iter()
974        .map(|(_catalog_id, global_id)| global_id)
975        .collect();
976
977    Ok(new_builtin_collections)
978}
979
980fn add_new_remove_old_builtin_clusters_migration(
981    txn: &mut mz_catalog::durable::Transaction<'_>,
982    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
983    boot_ts: Timestamp,
984) -> Result<(), mz_catalog::durable::CatalogError> {
985    let mut durable_clusters: BTreeMap<_, _> = txn
986        .get_clusters()
987        .filter(|cluster| cluster.id.is_system())
988        .map(|cluster| (cluster.name.to_string(), cluster))
989        .collect();
990
991    // Add new clusters.
992    for builtin_cluster in BUILTIN_CLUSTERS {
993        if durable_clusters.remove(builtin_cluster.name).is_none() {
994            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
995
996            let cluster_id = txn.insert_system_cluster(
997                builtin_cluster.name,
998                vec![],
999                builtin_cluster.privileges.to_vec(),
1000                builtin_cluster.owner_id.to_owned(),
1001                mz_catalog::durable::ClusterConfig {
1002                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
1003                        size: cluster_config.size,
1004                        availability_zones: vec![],
1005                        replication_factor: cluster_config.replication_factor,
1006                        logging: default_logging_config(),
1007                        optimizer_feature_overrides: Default::default(),
1008                        schedule: Default::default(),
1009                    }),
1010                    workload_class: None,
1011                },
1012                &HashSet::new(),
1013            )?;
1014
1015            let audit_id = txn.allocate_audit_log_id()?;
1016            txn.insert_audit_log_event(VersionedEvent::new(
1017                audit_id,
1018                EventType::Create,
1019                ObjectType::Cluster,
1020                EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1021                    id: cluster_id.to_string(),
1022                    name: builtin_cluster.name.to_string(),
1023                }),
1024                None,
1025                boot_ts.into(),
1026            ));
1027        }
1028    }
1029
1030    // Remove old clusters.
1031    let old_clusters = durable_clusters
1032        .values()
1033        .map(|cluster| cluster.id)
1034        .collect();
1035    txn.remove_clusters(&old_clusters)?;
1036
1037    for (_name, cluster) in &durable_clusters {
1038        let audit_id = txn.allocate_audit_log_id()?;
1039        txn.insert_audit_log_event(VersionedEvent::new(
1040            audit_id,
1041            EventType::Drop,
1042            ObjectType::Cluster,
1043            EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1044                id: cluster.id.to_string(),
1045                name: cluster.name.clone(),
1046            }),
1047            None,
1048            boot_ts.into(),
1049        ));
1050    }
1051
1052    Ok(())
1053}
1054
1055fn add_new_remove_old_builtin_introspection_source_migration(
1056    txn: &mut mz_catalog::durable::Transaction<'_>,
1057) -> Result<(), AdapterError> {
1058    let mut new_indexes = Vec::new();
1059    let mut removed_indexes = BTreeSet::new();
1060    for cluster in txn.get_clusters() {
1061        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1062
1063        let mut new_logs = Vec::new();
1064
1065        for log in BUILTINS::logs() {
1066            if introspection_source_index_ids.remove(log.name).is_none() {
1067                new_logs.push(log);
1068            }
1069        }
1070
1071        for log in new_logs {
1072            let (item_id, gid) =
1073                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1074            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1075        }
1076
1077        // Anything left in `introspection_source_index_ids` must have been deleted and should be
1078        // removed from the catalog.
1079        removed_indexes.extend(
1080            introspection_source_index_ids
1081                .into_keys()
1082                .map(|name| (cluster.id, name.to_string())),
1083        );
1084    }
1085    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1086    txn.remove_introspection_source_indexes(removed_indexes)?;
1087    Ok(())
1088}
1089
1090fn add_new_remove_old_builtin_roles_migration(
1091    txn: &mut mz_catalog::durable::Transaction<'_>,
1092) -> Result<(), mz_catalog::durable::CatalogError> {
1093    let mut durable_roles: BTreeMap<_, _> = txn
1094        .get_roles()
1095        .filter(|role| role.id.is_system() || role.id.is_predefined())
1096        .map(|role| (role.name.to_string(), role))
1097        .collect();
1098
1099    // Add new roles.
1100    for builtin_role in BUILTIN_ROLES {
1101        if durable_roles.remove(builtin_role.name).is_none() {
1102            txn.insert_builtin_role(
1103                builtin_role.id,
1104                builtin_role.name.to_string(),
1105                builtin_role.attributes.clone(),
1106                RoleMembership::new(),
1107                RoleVars::default(),
1108                builtin_role.oid,
1109            )?;
1110        }
1111    }
1112
1113    // Remove old roles.
1114    let old_roles = durable_roles.values().map(|role| role.id).collect();
1115    txn.remove_roles(&old_roles)?;
1116
1117    Ok(())
1118}
1119
1120fn add_new_remove_old_builtin_cluster_replicas_migration(
1121    txn: &mut Transaction<'_>,
1122    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1123    boot_ts: Timestamp,
1124) -> Result<(), AdapterError> {
1125    let cluster_lookup: BTreeMap<_, _> = txn
1126        .get_clusters()
1127        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1128        .collect();
1129
1130    let cluster_id_to_name: BTreeMap<ClusterId, String> = cluster_lookup
1131        .values()
1132        .map(|cluster| (cluster.id, cluster.name.clone()))
1133        .collect();
1134
1135    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1136        .get_cluster_replicas()
1137        .filter(|replica| replica.replica_id.is_system())
1138        .fold(BTreeMap::new(), |mut acc, replica| {
1139            acc.entry(replica.cluster_id)
1140                .or_insert_with(BTreeMap::new)
1141                .insert(replica.name.to_string(), replica);
1142            acc
1143        });
1144
1145    // Add new replicas.
1146    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1147        let cluster = cluster_lookup
1148            .get(builtin_replica.cluster_name)
1149            .expect("builtin cluster replica references non-existent cluster");
1150        // `empty_map` is a hack to simplify the if statement below.
1151        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1152        let replica_names = durable_replicas
1153            .get_mut(&cluster.id)
1154            .unwrap_or(&mut empty_map);
1155
1156        let builtin_cluster_bootstrap_config =
1157            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1158        if replica_names.remove(builtin_replica.name).is_none()
1159            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1160            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1161            // factor is configurable on bootstrap.
1162            && builtin_cluster_bootstrap_config.replication_factor > 0
1163        {
1164            let replica_size = match cluster.config.variant {
1165                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1166                ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size.clone(),
1167            };
1168
1169            let config = builtin_cluster_replica_config(replica_size.clone());
1170            let replica_id = txn.insert_cluster_replica(
1171                cluster.id,
1172                builtin_replica.name,
1173                config,
1174                MZ_SYSTEM_ROLE_ID,
1175            )?;
1176
1177            let audit_id = txn.allocate_audit_log_id()?;
1178            txn.insert_audit_log_event(VersionedEvent::new(
1179                audit_id,
1180                EventType::Create,
1181                ObjectType::ClusterReplica,
1182                EventDetails::CreateClusterReplicaV4(mz_audit_log::CreateClusterReplicaV4 {
1183                    cluster_id: cluster.id.to_string(),
1184                    cluster_name: cluster.name.clone(),
1185                    replica_id: Some(replica_id.to_string()),
1186                    replica_name: builtin_replica.name.to_string(),
1187                    logical_size: replica_size,
1188                    billed_as: None,
1189                    internal: false,
1190                    reason: CreateOrDropClusterReplicaReasonV1::System,
1191                    scheduling_policies: None,
1192                }),
1193                None,
1194                boot_ts.into(),
1195            ));
1196        }
1197    }
1198
1199    // Remove old replicas.
1200    let old_replicas: Vec<_> = durable_replicas
1201        .values()
1202        .flat_map(|replicas| replicas.values())
1203        .collect();
1204    let old_replica_ids = old_replicas.iter().map(|r| r.replica_id).collect();
1205    txn.remove_cluster_replicas(&old_replica_ids)?;
1206
1207    for replica in &old_replicas {
1208        let cluster_name = cluster_id_to_name
1209            .get(&replica.cluster_id)
1210            .cloned()
1211            .unwrap_or_else(|| "<unknown>".to_string());
1212
1213        let audit_id = txn.allocate_audit_log_id()?;
1214        txn.insert_audit_log_event(VersionedEvent::new(
1215            audit_id,
1216            EventType::Drop,
1217            ObjectType::ClusterReplica,
1218            EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1219                cluster_id: replica.cluster_id.to_string(),
1220                cluster_name,
1221                replica_id: Some(replica.replica_id.to_string()),
1222                replica_name: replica.name.clone(),
1223                reason: CreateOrDropClusterReplicaReasonV1::System,
1224                scheduling_policies: None,
1225            }),
1226            None,
1227            boot_ts.into(),
1228        ));
1229    }
1230
1231    Ok(())
1232}
1233
1234/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1235/// the 'cluster' parameter.
1236///
1237/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1238/// input is no longer valid. For example if we remove a configuration parameter or change the
1239/// accepted set of values.
1240fn remove_invalid_config_param_role_defaults_migration(
1241    txn: &mut Transaction<'_>,
1242) -> Result<(), AdapterError> {
1243    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1244
1245    let roles_to_migrate: BTreeMap<_, _> = txn
1246        .get_roles()
1247        .filter_map(|mut role| {
1248            // Create an empty SessionVars just so we can check if a var is valid.
1249            //
1250            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1251            // session variables.
1252            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1253
1254            // Iterate over all of the variable defaults for this role.
1255            let mut invalid_roles_vars = BTreeMap::new();
1256            for (name, value) in &role.vars.map {
1257                // If one does not exist or its value is invalid, then mark it for removal.
1258                let Ok(session_var) = session_vars.inspect(name) else {
1259                    invalid_roles_vars.insert(name.clone(), value.clone());
1260                    continue;
1261                };
1262                if session_var.check(value.borrow()).is_err() {
1263                    invalid_roles_vars.insert(name.clone(), value.clone());
1264                }
1265            }
1266
1267            // If the role has no invalid values, nothing to do!
1268            if invalid_roles_vars.is_empty() {
1269                return None;
1270            }
1271
1272            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1273
1274            // Otherwise, remove the variables from the role and return it to be updated.
1275            for (name, _value) in invalid_roles_vars {
1276                role.vars.map.remove(&name);
1277            }
1278            Some(role)
1279        })
1280        .map(|role| (role.id, role))
1281        .collect();
1282
1283    txn.update_roles_without_auth(roles_to_migrate)?;
1284
1285    Ok(())
1286}
1287
1288/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1289/// are marked as pending and should be cleaned up on catalog open.
1290fn remove_pending_cluster_replicas_migration(
1291    tx: &mut Transaction,
1292    boot_ts: mz_repr::Timestamp,
1293) -> Result<(), anyhow::Error> {
1294    // Build a map of cluster_id -> cluster_name for audit events.
1295    let cluster_names: BTreeMap<_, _> = tx.get_clusters().map(|c| (c.id, c.name)).collect();
1296
1297    let occurred_at = boot_ts.into();
1298
1299    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1300        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1301            replica.config.location
1302        {
1303            let cluster_name = cluster_names
1304                .get(&replica.cluster_id)
1305                .cloned()
1306                .unwrap_or_else(|| "<unknown>".to_string());
1307
1308            info!(
1309                "removing pending cluster replica '{}' from cluster '{}'",
1310                replica.name, cluster_name,
1311            );
1312
1313            tx.remove_cluster_replica(replica.replica_id)?;
1314
1315            // Emit an audit log event so that the drop is visible in
1316            // mz_audit_events, matching the create event that was
1317            // recorded when the pending replica was first created.
1318            let audit_id = tx.allocate_audit_log_id()?;
1319            tx.insert_audit_log_event(VersionedEvent::new(
1320                audit_id,
1321                EventType::Drop,
1322                ObjectType::ClusterReplica,
1323                EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1324                    cluster_id: replica.cluster_id.to_string(),
1325                    cluster_name,
1326                    replica_id: Some(replica.replica_id.to_string()),
1327                    replica_name: replica.name,
1328                    reason: CreateOrDropClusterReplicaReasonV1::System,
1329                    scheduling_policies: None,
1330                }),
1331                None,
1332                occurred_at,
1333            ));
1334        }
1335    }
1336    Ok(())
1337}
1338
1339pub(crate) fn builtin_cluster_replica_config(
1340    replica_size: String,
1341) -> mz_catalog::durable::ReplicaConfig {
1342    mz_catalog::durable::ReplicaConfig {
1343        location: mz_catalog::durable::ReplicaLocation::Managed {
1344            availability_zone: None,
1345            billed_as: None,
1346            pending: false,
1347            internal: false,
1348            size: replica_size,
1349        },
1350        logging: default_logging_config(),
1351    }
1352}
1353
1354fn default_logging_config() -> ReplicaLogging {
1355    ReplicaLogging {
1356        log_logging: false,
1357        interval: Some(Duration::from_secs(1)),
1358    }
1359}
1360
1361#[derive(Debug)]
1362pub struct BuiltinBootstrapClusterConfigMap {
1363    /// Size and replication factor to default system_cluster on bootstrap
1364    pub system_cluster: BootstrapBuiltinClusterConfig,
1365    /// Size and replication factor to default catalog_server_cluster on bootstrap
1366    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1367    /// Size and replication factor to default probe_cluster on bootstrap
1368    pub probe_cluster: BootstrapBuiltinClusterConfig,
1369    /// Size and replication factor to default support_cluster on bootstrap
1370    pub support_cluster: BootstrapBuiltinClusterConfig,
1371    /// Size to default analytics_cluster on bootstrap
1372    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1373}
1374
1375impl BuiltinBootstrapClusterConfigMap {
1376    /// Gets the size of the builtin cluster based on the provided name
1377    fn get_config(
1378        &self,
1379        cluster_name: &str,
1380    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1381        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1382            &self.system_cluster
1383        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1384            &self.catalog_server_cluster
1385        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1386            &self.probe_cluster
1387        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1388            &self.support_cluster
1389        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1390            &self.analytics_cluster
1391        } else {
1392            return Err(mz_catalog::durable::CatalogError::Catalog(
1393                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1394            ));
1395        };
1396        Ok(cluster_config.clone())
1397    }
1398}
1399
1400/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1401///
1402///   - Convert each update into a type that implements [`std::cmp::Ord`].
1403///   - Update the timestamp of each update to the same value.
1404///   - Convert the diff of each update to a type that implements
1405///     [`differential_dataflow::difference::Semigroup`].
1406///
1407/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1408/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1409/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1410/// and are only created after bootstrap is complete. So we forcibly convert each
1411/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1412/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1413/// temporary item variant and does implement [`std::cmp::Ord`].
1414///
1415/// WARNING: Do not call outside of startup.
1416pub(crate) fn into_consolidatable_updates_startup(
1417    updates: Vec<StateUpdate>,
1418    ts: Timestamp,
1419) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1420    updates
1421        .into_iter()
1422        .map(|StateUpdate { kind, ts: _, diff }| {
1423            let kind: BootstrapStateUpdateKind = kind
1424                .try_into()
1425                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1426            (kind, ts, Diff::from(diff))
1427        })
1428        .collect()
1429}
1430
1431fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1432    defaults: &BTreeMap<String, String>,
1433    remote: Option<&BTreeMap<String, String>>,
1434    cfg: &mz_dyncfg::Config<T>,
1435) -> T::ConfigType {
1436    let mut val = T::into_config_type(cfg.default().clone());
1437    let get_fn = |map: &BTreeMap<String, String>| {
1438        let val = map.get(cfg.name())?;
1439        match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1440            Ok(x) => Some(x),
1441            Err(err) => {
1442                tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1443                None
1444            }
1445        }
1446    };
1447    if let Some(x) = get_fn(defaults) {
1448        val = x;
1449    }
1450    if let Some(x) = remote.and_then(get_fn) {
1451        val = x;
1452    }
1453    val
1454}