Skip to main content

mz_adapter/catalog/
open.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to opening a [`Catalog`].
11
12mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::ENABLE_EXPRESSION_CACHE;
23use mz_audit_log::{
24    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, ObjectType, VersionedEvent,
25};
26use mz_auth::hash::scram256_hash;
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::{
29    BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
30    Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
31};
32use mz_catalog::config::StateConfig;
33use mz_catalog::durable::objects::{
34    SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
35};
36use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
37use mz_catalog::expr_cache::{
38    ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
39};
40use mz_catalog::memory::error::{Error, ErrorKind};
41use mz_catalog::memory::objects::{
42    BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
43};
44use mz_controller::clusters::ReplicaLogging;
45use mz_controller_types::ClusterId;
46use mz_ore::cast::usize_to_u64;
47use mz_ore::collections::HashSet;
48use mz_ore::now::{SYSTEM_TIME, to_datetime};
49use mz_ore::{instrument, soft_assert_no_log};
50use mz_repr::adt::mz_acl_item::PrivilegeMap;
51use mz_repr::namespaces::is_unstable_schema;
52use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
53use mz_sql::catalog::{CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars};
54use mz_sql::func::OP_IMPLS;
55use mz_sql::names::CommentObjectId;
56use mz_sql::rbac;
57use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
58use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
59use mz_storage_client::controller::{StorageMetadata, StorageTxn};
60use mz_storage_client::storage_collections::StorageCollections;
61use tracing::{Instrument, info, warn};
62use uuid::Uuid;
63
64// DO NOT add any more imports from `crate` outside of `crate::catalog`.
65use crate::AdapterError;
66use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
67use crate::catalog::state::LocalExpressionCache;
68use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState, Config, is_reserved_name};
69
70pub struct InitializeStateResult {
71    /// An initialized [`CatalogState`].
72    pub state: CatalogState,
73    /// A set of new shards that may need to be initialized (only used by 0dt migration).
74    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
75    /// A set of new builtin items.
76    pub new_builtin_collections: BTreeSet<GlobalId>,
77    /// A list of builtin table updates corresponding to the initialized state.
78    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
79    /// The version of the catalog that existed before initializing the catalog.
80    pub last_seen_version: String,
81    /// A handle to the expression cache if it's enabled.
82    pub expr_cache_handle: Option<ExpressionCacheHandle>,
83    /// The global expressions that were cached in `expr_cache_handle`.
84    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
85    /// The local expressions that were NOT cached in `expr_cache_handle`.
86    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
87}
88
89pub struct OpenCatalogResult {
90    /// An opened [`Catalog`].
91    pub catalog: Catalog,
92    /// A set of new shards that may need to be initialized.
93    pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
94    /// A set of new builtin items.
95    pub new_builtin_collections: BTreeSet<GlobalId>,
96    /// A list of builtin table updates corresponding to the initialized state.
97    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
98    /// The global expressions that were cached in the expression cache.
99    pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
100    /// The local expressions that were NOT cached in the expression cache.
101    pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
102}
103
104impl Catalog {
105    /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
106    /// external to a [mz_catalog::durable::DurableCatalogState]
107    /// (for example: no [mz_secrets::SecretsReader]).
108    pub async fn initialize_state<'a>(
109        config: StateConfig,
110        storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
111    ) -> Result<InitializeStateResult, AdapterError> {
112        for builtin_role in BUILTIN_ROLES {
113            assert!(
114                is_reserved_name(builtin_role.name),
115                "builtin role {builtin_role:?} must start with one of the following prefixes {}",
116                BUILTIN_PREFIXES.join(", ")
117            );
118        }
119        for builtin_cluster in BUILTIN_CLUSTERS {
120            assert!(
121                is_reserved_name(builtin_cluster.name),
122                "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
123                BUILTIN_PREFIXES.join(", ")
124            );
125        }
126
127        let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
128        if config.all_features {
129            system_configuration.enable_all_feature_flags_by_default();
130        }
131
132        let mut state = CatalogState {
133            database_by_name: imbl::OrdMap::new(),
134            database_by_id: imbl::OrdMap::new(),
135            entry_by_id: imbl::OrdMap::new(),
136            entry_by_global_id: imbl::OrdMap::new(),
137            notices_by_dep_id: imbl::OrdMap::new(),
138            ambient_schemas_by_name: imbl::OrdMap::new(),
139            ambient_schemas_by_id: imbl::OrdMap::new(),
140            clusters_by_name: imbl::OrdMap::new(),
141            clusters_by_id: imbl::OrdMap::new(),
142            roles_by_name: imbl::OrdMap::new(),
143            roles_by_id: imbl::OrdMap::new(),
144            network_policies_by_id: imbl::OrdMap::new(),
145            role_auth_by_id: imbl::OrdMap::new(),
146            network_policies_by_name: imbl::OrdMap::new(),
147            system_configuration: Arc::new(system_configuration),
148            scoped_system_parameters: Default::default(),
149            default_privileges: Arc::new(DefaultPrivileges::default()),
150            system_privileges: Arc::new(PrivilegeMap::default()),
151            comments: Arc::new(CommentsMap::default()),
152            source_references: imbl::OrdMap::new(),
153            storage_metadata: Arc::new(StorageMetadata::default()),
154            temporary_schemas: imbl::OrdMap::new(),
155            mock_authentication_nonce: Default::default(),
156            config: mz_sql::catalog::CatalogConfig {
157                start_time: to_datetime((config.now)()),
158                start_instant: Instant::now(),
159                nonce: rand::random(),
160                environment_id: config.environment_id,
161                session_id: Uuid::new_v4(),
162                build_info: config.build_info,
163                now: config.now.clone(),
164                connection_context: config.connection_context,
165                helm_chart_version: config.helm_chart_version,
166            },
167            cluster_replica_sizes: config.cluster_replica_sizes,
168            availability_zones: config.availability_zones,
169            egress_addresses: config.egress_addresses,
170            aws_principal_context: config.aws_principal_context,
171            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
172            http_host_name: config.http_host_name,
173            license_key: config.license_key,
174        };
175
176        let deploy_generation = storage.get_deployment_generation().await?;
177
178        let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
179        assert!(!updates.is_empty(), "initial catalog snapshot is missing");
180        let mut txn = storage.transaction().await?;
181
182        // Migrate/update durable data before we start loading the in-memory catalog.
183        let new_builtin_collections = {
184            migrate::durable_migrate(
185                &mut txn,
186                state.config.environment_id.organization_id(),
187                config.boot_ts,
188            )?;
189            // Overwrite and persist selected parameter values in `remote_system_parameters` that
190            // was pulled from a remote frontend (e.g. LaunchDarkly) if present.
191            if let Some(remote_system_parameters) = config.remote_system_parameters {
192                for (name, value) in remote_system_parameters {
193                    txn.upsert_system_config(&name, value)?;
194                }
195                txn.set_system_config_synced_once()?;
196            }
197            // Add any new builtin objects and remove old ones.
198            let new_builtin_collections = add_new_remove_old_builtin_items_migration(&mut txn)?;
199            let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
200                system_cluster: config.builtin_system_cluster_config,
201                catalog_server_cluster: config.builtin_catalog_server_cluster_config,
202                probe_cluster: config.builtin_probe_cluster_config,
203                support_cluster: config.builtin_support_cluster_config,
204                analytics_cluster: config.builtin_analytics_cluster_config,
205            };
206            add_new_remove_old_builtin_clusters_migration(
207                &mut txn,
208                &builtin_bootstrap_cluster_config_map,
209                config.boot_ts,
210            )?;
211            add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
212            add_new_remove_old_builtin_cluster_replicas_migration(
213                &mut txn,
214                &builtin_bootstrap_cluster_config_map,
215                config.boot_ts,
216            )?;
217            add_new_remove_old_builtin_roles_migration(&mut txn)?;
218            remove_invalid_config_param_role_defaults_migration(&mut txn)?;
219            remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
220
221            new_builtin_collections
222        };
223
224        let op_updates = txn.get_and_commit_op_updates();
225        updates.extend(op_updates);
226
227        let mut builtin_table_updates = Vec::new();
228
229        // Seed the in-memory catalog with values that don't come from the durable catalog.
230        {
231            // Set defaults from configuration passed in the provided `system_parameter_defaults`
232            // map.
233            for (name, value) in config.system_parameter_defaults {
234                match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
235                    Ok(_) => (),
236                    Err(Error {
237                        kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
238                    }) => {
239                        warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
240                    }
241                    Err(e) => return Err(e.into()),
242                };
243            }
244            state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
245        }
246
247        // Make life easier by consolidating all updates, so that we end up with only positive
248        // diffs.
249        let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
250        differential_dataflow::consolidation::consolidate_updates(&mut updates);
251        soft_assert_no_log!(
252            updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
253            "consolidated updates should be positive during startup: {updates:?}"
254        );
255
256        let mut pre_item_updates = Vec::new();
257        let mut system_item_updates = Vec::new();
258        let mut item_updates = Vec::new();
259        let mut post_item_updates = Vec::new();
260        let mut audit_log_updates = Vec::new();
261        for (kind, ts, diff) in updates {
262            match kind {
263                BootstrapStateUpdateKind::Role(_)
264                | BootstrapStateUpdateKind::RoleAuth(_)
265                | BootstrapStateUpdateKind::Database(_)
266                | BootstrapStateUpdateKind::Schema(_)
267                | BootstrapStateUpdateKind::DefaultPrivilege(_)
268                | BootstrapStateUpdateKind::SystemPrivilege(_)
269                | BootstrapStateUpdateKind::SystemConfiguration(_)
270                | BootstrapStateUpdateKind::ClusterSystemConfiguration(_)
271                | BootstrapStateUpdateKind::ReplicaSystemConfiguration(_)
272                | BootstrapStateUpdateKind::Cluster(_)
273                | BootstrapStateUpdateKind::NetworkPolicy(_)
274                | BootstrapStateUpdateKind::ClusterReplica(_) => {
275                    pre_item_updates.push(StateUpdate {
276                        kind: kind.into(),
277                        ts,
278                        diff: diff.try_into().expect("valid diff"),
279                    })
280                }
281                BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
282                | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
283                    system_item_updates.push(StateUpdate {
284                        kind: kind.into(),
285                        ts,
286                        diff: diff.try_into().expect("valid diff"),
287                    })
288                }
289                BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
290                    kind: kind.into(),
291                    ts,
292                    diff: diff.try_into().expect("valid diff"),
293                }),
294                BootstrapStateUpdateKind::Comment(_)
295                | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
296                | BootstrapStateUpdateKind::SourceReferences(_)
297                | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
298                    post_item_updates.push((kind, ts, diff));
299                }
300                BootstrapStateUpdateKind::AuditLog(_) => {
301                    audit_log_updates.push(StateUpdate {
302                        kind: kind.into(),
303                        ts,
304                        diff: diff.try_into().expect("valid diff"),
305                    });
306                }
307            }
308        }
309
310        let (builtin_table_update, _catalog_updates) = state
311            .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
312            .await;
313        builtin_table_updates.extend(builtin_table_update);
314
315        // Ensure mz_system has a password if configured to have one.
316        // It's important we do this after the `pre_item_updates` so that
317        // the mz_system role exists in the catalog.
318        {
319            if let Some(password) = config.external_login_password_mz_system {
320                let role_auth = RoleAuth {
321                    role_id: MZ_SYSTEM_ROLE_ID,
322                    // builtin roles should always use a secure scram iteration
323                    // <https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html>
324                    password_hash: Some(
325                        scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
326                            .map_err(|_| {
327                            AdapterError::Internal("Failed to hash mz_system password.".to_owned())
328                        })?,
329                    ),
330                    updated_at: SYSTEM_TIME(),
331                };
332                state
333                    .role_auth_by_id
334                    .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
335                let builtin_table_update = state.generate_builtin_table_update(
336                    mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
337                    mz_catalog::memory::objects::StateDiff::Addition,
338                );
339                builtin_table_updates.extend(builtin_table_update);
340            }
341        }
342
343        let expr_cache_start = Instant::now();
344        info!("startup: coordinator init: catalog open: expr cache open beginning");
345        // We wait until after the `pre_item_updates` to open the cache so we can get accurate
346        // dyncfgs because the `pre_item_updates` contains `SystemConfiguration` updates.
347        let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
348        let expr_cache_enabled = config
349            .enable_expression_cache_override
350            .unwrap_or(enable_expr_cache_dyncfg);
351        let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
352            info!(
353                ?config.enable_expression_cache_override,
354                ?enable_expr_cache_dyncfg,
355                "using expression cache for startup"
356            );
357            let current_ids = txn
358                .get_items()
359                .flat_map(|item| {
360                    let gid = item.global_id.clone();
361                    let gids: Vec<_> = item.extra_versions.values().cloned().collect();
362                    std::iter::once(gid).chain(gids)
363                })
364                .chain(
365                    txn.get_system_object_mappings()
366                        .map(|som| som.unique_identifier.global_id),
367                )
368                .collect();
369            let dyncfgs = config.persist_client.dyncfgs().clone();
370            let build_version = if config.build_info.is_dev() {
371                // A single dev version can be used for many different builds, so we need to use
372                // the build version that is also enriched with build metadata.
373                config
374                    .build_info
375                    .semver_version_build()
376                    .expect("build ID is not available on your platform!")
377            } else {
378                config.build_info.semver_version()
379            };
380            let expr_cache_config = ExpressionCacheConfig {
381                build_version,
382                shard_id: txn
383                    .get_expression_cache_shard()
384                    .expect("expression cache shard should exist for opened catalogs"),
385                persist: config.persist_client,
386                current_ids,
387                remove_prior_versions: !config.read_only,
388                compact_shard: config.read_only,
389                dyncfgs,
390            };
391            let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
392                ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
393            (
394                Some(expr_cache_handle),
395                cached_local_exprs,
396                cached_global_exprs,
397            )
398        } else {
399            (None, BTreeMap::new(), BTreeMap::new())
400        };
401        let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
402        info!(
403            "startup: coordinator init: catalog open: expr cache open complete in {:?}",
404            expr_cache_start.elapsed()
405        );
406
407        // When initializing/bootstrapping, we don't use the catalog updates but
408        // instead load the catalog fully and then go ahead and apply commands
409        // to the controller(s). Maybe we _should_ instead use the same logic
410        // and return and use the updates from here. But that's at the very
411        // least future work.
412        let (builtin_table_update, _catalog_updates) = state
413            .apply_updates(system_item_updates, &mut local_expr_cache)
414            .await;
415        builtin_table_updates.extend(builtin_table_update);
416
417        let last_seen_version =
418            get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
419
420        let mz_authentication_mock_nonce =
421            txn.get_authentication_mock_nonce().ok_or_else(|| {
422                Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
423            })?;
424
425        state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
426
427        // Migrate item ASTs.
428        let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
429            let migrate_result = migrate::migrate(
430                &mut state,
431                &mut txn,
432                &mut local_expr_cache,
433                item_updates,
434                config.now,
435                config.boot_ts,
436            )
437            .await
438            .map_err(|e| {
439                Error::new(ErrorKind::FailedCatalogMigration {
440                    last_seen_version: last_seen_version.clone(),
441                    this_version: config.build_info.version,
442                    cause: e.to_string(),
443                })
444            })?;
445            if !migrate_result.post_item_updates.is_empty() {
446                // Include any post-item-updates generated by migrations, and then consolidate
447                // them to ensure diffs are all positive.
448                post_item_updates.extend(migrate_result.post_item_updates);
449                // Push everything to the same timestamp so it consolidates cleanly.
450                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
451                    for (_, ts, _) in &mut post_item_updates {
452                        *ts = max_ts;
453                    }
454                }
455                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
456            }
457
458            (
459                migrate_result.builtin_table_updates,
460                migrate_result.catalog_updates,
461            )
462        } else {
463            state
464                .apply_updates(item_updates, &mut local_expr_cache)
465                .await
466        };
467        builtin_table_updates.extend(builtin_table_update);
468
469        let post_item_updates = post_item_updates
470            .into_iter()
471            .map(|(kind, ts, diff)| StateUpdate {
472                kind: kind.into(),
473                ts,
474                diff: diff.try_into().expect("valid diff"),
475            })
476            .collect();
477        let (builtin_table_update, _catalog_updates) = state
478            .apply_updates(post_item_updates, &mut local_expr_cache)
479            .await;
480        builtin_table_updates.extend(builtin_table_update);
481
482        // We don't need to apply the audit logs in memory, yet apply can be expensive when the
483        // audit log grows large. Therefore, we skip the apply step and just generate the builtin
484        // updates.
485        for audit_log_update in audit_log_updates {
486            builtin_table_updates.extend(
487                state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
488            );
489        }
490
491        // Migrate builtin items.
492        let schema_migration_result = builtin_schema_migration::run(
493            config.build_info,
494            deploy_generation,
495            &mut txn,
496            config.builtin_item_migration_config,
497        )
498        .await?;
499
500        let state_updates = txn.get_and_commit_op_updates();
501
502        // When initializing/bootstrapping, we don't use the catalog updates but
503        // instead load the catalog fully and then go ahead and apply commands
504        // to the controller(s). Maybe we _should_ instead use the same logic
505        // and return and use the updates from here. But that's at the very
506        // least future work.
507        let (table_updates, _catalog_updates) = state
508            .apply_updates(state_updates, &mut local_expr_cache)
509            .await;
510        builtin_table_updates.extend(table_updates);
511        let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
512
513        // Bump the migration version immediately before committing.
514        set_migration_version(&mut txn, config.build_info.semver_version())?;
515
516        txn.commit(config.boot_ts).await?;
517
518        // Now that the migration is durable, run any requested deferred cleanup.
519        schema_migration_result.cleanup_action.await;
520
521        Ok(InitializeStateResult {
522            state,
523            migrated_storage_collections_0dt: schema_migration_result.replaced_items,
524            new_builtin_collections: new_builtin_collections.into_iter().collect(),
525            builtin_table_updates,
526            last_seen_version,
527            expr_cache_handle,
528            cached_global_exprs,
529            uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
530        })
531    }
532
533    /// Opens or creates a catalog that stores data at `path`.
534    ///
535    /// Returns the catalog, metadata about builtin objects that have changed
536    /// schemas since last restart, a list of updates to builtin tables that
537    /// describe the initial state of the catalog, and the version of the
538    /// catalog before any migrations were performed.
539    ///
540    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 17KB. This would
541    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
542    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
543    #[instrument(name = "catalog::open")]
544    pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
545        async move {
546            let mut storage = config.storage;
547
548            let InitializeStateResult {
549                state,
550                migrated_storage_collections_0dt,
551                new_builtin_collections,
552                mut builtin_table_updates,
553                last_seen_version: _,
554                expr_cache_handle,
555                cached_global_exprs,
556                uncached_local_exprs,
557            } =
558                // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
559                // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
560                // Because of that we purposefully move this Future onto the heap (i.e. Box it).
561                Self::initialize_state(config.state, &mut storage)
562                    .instrument(tracing::info_span!("catalog::initialize_state"))
563                    .boxed()
564                    .await?;
565
566            let catalog = Catalog {
567                state,
568                expr_cache_handle,
569                transient_revision: 1,
570                storage: Arc::new(tokio::sync::Mutex::new(storage)),
571            };
572
573            // Operators aren't stored in the catalog, but we would like them in
574            // introspection views.
575            for (op, func) in OP_IMPLS.iter() {
576                match func {
577                    mz_sql::func::Func::Scalar(impls) => {
578                        for imp in impls {
579                            builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
580                                catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
581                            ));
582                        }
583                    }
584                    _ => unreachable!("all operators must be scalar functions"),
585                }
586            }
587
588            for ip in &catalog.state.egress_addresses {
589                builtin_table_updates.push(
590                    catalog
591                        .state
592                        .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
593                );
594            }
595
596            if !catalog.state.license_key.id.is_empty() {
597                builtin_table_updates.push(
598                    catalog.state.resolve_builtin_table_update(
599                        catalog
600                            .state
601                            .pack_license_key_update(&catalog.state.license_key)?,
602                    ),
603                );
604            }
605
606            catalog.storage().await.mark_bootstrap_complete().await;
607
608            Ok(OpenCatalogResult {
609                catalog,
610                migrated_storage_collections_0dt,
611                new_builtin_collections,
612                builtin_table_updates,
613                cached_global_exprs,
614                uncached_local_exprs,
615            })
616        }
617        .instrument(tracing::info_span!("catalog::open"))
618        .boxed()
619    }
620
621    /// Initializes STORAGE to understand all shards that `self` expects to
622    /// exist.
623    ///
624    /// Note that this must be done before creating/rendering collections
625    /// because the storage controller might not be aware of new system
626    /// collections created between versions.
627    async fn initialize_storage_state(
628        &mut self,
629        storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
630    ) -> Result<(), mz_catalog::durable::CatalogError> {
631        let collections = self
632            .entries()
633            .filter(|entry| entry.item().is_storage_collection())
634            .flat_map(|entry| entry.global_ids())
635            .collect();
636
637        // Clone the state so that any errors that occur do not leak any
638        // transformations on error.
639        let mut state = self.state.clone();
640
641        let mut storage = self.storage().await;
642        let shard_id = storage.shard_id();
643        let mut txn = storage.transaction().await?;
644
645        // Ensure the storage controller knows about the catalog shard and associates it with the
646        // `MZ_CATALOG_RAW` builtin source.
647        let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
648        let global_id = self.get_entry(&item_id).latest_global_id();
649        match txn.get_collection_metadata().get(&global_id) {
650            None => {
651                txn.insert_collection_metadata([(global_id, shard_id)].into())
652                    .map_err(mz_catalog::durable::DurableCatalogError::from)?;
653            }
654            Some(id) => assert_eq!(*id, shard_id),
655        }
656
657        storage_collections
658            .initialize_state(&mut txn, collections)
659            .await
660            .map_err(mz_catalog::durable::DurableCatalogError::from)?;
661
662        let updates = txn.get_and_commit_op_updates();
663        let (builtin_updates, catalog_updates) = state
664            .apply_updates(updates, &mut LocalExpressionCache::Closed)
665            .await;
666        assert!(
667            builtin_updates.is_empty(),
668            "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
669        );
670        assert!(
671            catalog_updates.is_empty(),
672            "storage is not allowed to generate catalog changes that would change the catalog or controller state"
673        );
674        let commit_ts = txn.upper();
675        txn.commit(commit_ts).await?;
676        drop(storage);
677
678        // Save updated state.
679        self.state = state;
680        Ok(())
681    }
682
683    /// [`mz_controller::Controller`] depends on durable catalog state to boot,
684    /// so make it available and initialize the controller.
685    pub async fn initialize_controller(
686        &mut self,
687        config: mz_controller::ControllerConfig,
688        envd_epoch: core::num::NonZeroI64,
689        read_only: bool,
690    ) -> Result<mz_controller::Controller, mz_catalog::durable::CatalogError> {
691        let controller_start = Instant::now();
692        info!("startup: controller init: beginning");
693
694        let controller = {
695            let mut storage = self.storage().await;
696            let mut tx = storage.transaction().await?;
697            mz_controller::prepare_initialization(&mut tx)
698                .map_err(mz_catalog::durable::DurableCatalogError::from)?;
699            let updates = tx.get_and_commit_op_updates();
700            assert!(
701                updates.is_empty(),
702                "initializing controller should not produce updates: {updates:?}"
703            );
704            let commit_ts = tx.upper();
705            tx.commit(commit_ts).await?;
706
707            let read_only_tx = storage.transaction().await?;
708
709            mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
710        };
711
712        self.initialize_storage_state(&controller.storage_collections)
713            .await?;
714
715        info!(
716            "startup: controller init: complete in {:?}",
717            controller_start.elapsed()
718        );
719
720        Ok(controller)
721    }
722
723    /// Politely releases all external resources that can only be released in an async context.
724    pub async fn expire(self) {
725        // If no one else holds a reference to storage, then clean up the storage resources.
726        // Otherwise, hopefully the other reference cleans up the resources when it's dropped.
727        if let Some(storage) = Arc::into_inner(self.storage) {
728            let storage = storage.into_inner();
729            storage.expire().await;
730        }
731    }
732}
733
734impl CatalogState {
735    /// Set the default value for `name`, which is the value it will be reset to.
736    fn set_system_configuration_default(
737        &mut self,
738        name: &str,
739        value: VarInput,
740    ) -> Result<(), Error> {
741        Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
742    }
743}
744
745/// Updates the catalog with new and removed builtin items.
746///
747/// Returns the list of new builtin [`GlobalId`]s.
748fn add_new_remove_old_builtin_items_migration(
749    txn: &mut mz_catalog::durable::Transaction<'_>,
750) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
751    let mut new_builtin_mappings = Vec::new();
752    // Used to validate unique descriptions.
753    let mut builtin_descs = HashSet::new();
754
755    // We compare the builtin items that are compiled into the binary with the builtin items that
756    // are persisted in the catalog to discover new and deleted builtin items.
757    let mut builtins = Vec::new();
758    for builtin in BUILTINS::iter() {
759        let desc = SystemObjectDescription {
760            schema_name: builtin.schema().to_string(),
761            object_type: builtin.catalog_item_type(),
762            object_name: builtin.name().to_string(),
763        };
764        // Validate that the description is unique.
765        if !builtin_descs.insert(desc.clone()) {
766            panic!(
767                "duplicate builtin description: {:?}, {:?}",
768                SystemObjectDescription {
769                    schema_name: builtin.schema().to_string(),
770                    object_type: builtin.catalog_item_type(),
771                    object_name: builtin.name().to_string(),
772                },
773                builtin
774            );
775        }
776        builtins.push((desc, builtin));
777    }
778
779    let mut system_object_mappings: BTreeMap<_, _> = txn
780        .get_system_object_mappings()
781        .map(|system_object_mapping| {
782            (
783                system_object_mapping.description.clone(),
784                system_object_mapping,
785            )
786        })
787        .collect();
788
789    let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
790        builtins.into_iter().partition_map(|(desc, builtin)| {
791            let fingerprint = match builtin.runtime_alterable() {
792                false => builtin.fingerprint(),
793                true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
794            };
795            match system_object_mappings.remove(&desc) {
796                Some(system_object_mapping) => {
797                    Either::Left((builtin, system_object_mapping, fingerprint))
798                }
799                None => Either::Right((builtin, fingerprint)),
800            }
801        });
802    let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
803    let new_builtins: Vec<_> = new_builtins
804        .into_iter()
805        .zip_eq(new_builtin_ids.clone())
806        .collect();
807
808    // Add new builtin items to catalog.
809    for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
810        new_builtin_mappings.push(SystemObjectMapping {
811            description: SystemObjectDescription {
812                schema_name: builtin.schema().to_string(),
813                object_type: builtin.catalog_item_type(),
814                object_name: builtin.name().to_string(),
815            },
816            unique_identifier: SystemObjectUniqueIdentifier {
817                catalog_id,
818                global_id,
819                fingerprint,
820            },
821        });
822
823        // Runtime-alterable system objects are durably recorded to the
824        // usual items collection, so that they can be later altered at
825        // runtime by their owner (i.e., outside of the usual builtin
826        // migration framework that requires changes to the binary
827        // itself).
828        let handled_runtime_alterable = match builtin {
829            Builtin::Connection(c) if c.runtime_alterable => {
830                let mut acl_items = vec![rbac::owner_privilege(
831                    mz_sql::catalog::ObjectType::Connection,
832                    c.owner_id.clone(),
833                )];
834                acl_items.extend_from_slice(c.access);
835                // Builtin Connections cannot be versioned.
836                let versions = BTreeMap::new();
837
838                txn.insert_item(
839                    catalog_id,
840                    c.oid,
841                    global_id,
842                    mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
843                    c.name,
844                    c.sql.into(),
845                    *c.owner_id,
846                    acl_items,
847                    versions,
848                )?;
849                true
850            }
851            _ => false,
852        };
853        assert_eq!(
854            builtin.runtime_alterable(),
855            handled_runtime_alterable,
856            "runtime alterable object was not handled by migration",
857        );
858    }
859    txn.set_system_object_mappings(new_builtin_mappings)?;
860
861    // Update comments of all builtin objects
862    let builtins_with_catalog_ids = existing_builtins
863        .iter()
864        .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
865        .chain(
866            new_builtins
867                .into_iter()
868                .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
869        );
870
871    for (builtin, id) in builtins_with_catalog_ids {
872        let (comment_id, desc, comments) = match builtin {
873            Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
874            Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
875            Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
876            Builtin::MaterializedView(mv) => (
877                CommentObjectId::MaterializedView(id),
878                &mv.desc,
879                &mv.column_comments,
880            ),
881            Builtin::Log(_)
882            | Builtin::Type(_)
883            | Builtin::Func(_)
884            | Builtin::Index(_)
885            | Builtin::Connection(_) => continue,
886        };
887        txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
888
889        let mut comments = comments.clone();
890        for (col_idx, name) in desc.iter_names().enumerate() {
891            if let Some(comment) = comments.remove(name.as_str()) {
892                // Comment column indices are 1 based
893                txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
894            }
895        }
896        assert!(
897            comments.is_empty(),
898            "builtin object contains dangling comments that don't correspond to columns {comments:?}"
899        );
900    }
901
902    // Anything left in `system_object_mappings` must have been deleted and should be removed from
903    // the catalog.
904    let mut deleted_system_objects = BTreeSet::new();
905    let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
906    let mut deleted_comments = BTreeSet::new();
907    for (desc, mapping) in system_object_mappings {
908        deleted_system_objects.insert(mapping.description);
909        if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
910            deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
911        }
912
913        let id = mapping.unique_identifier.catalog_id;
914        let comment_id = match desc.object_type {
915            CatalogItemType::Table => CommentObjectId::Table(id),
916            CatalogItemType::Source => CommentObjectId::Source(id),
917            CatalogItemType::View => CommentObjectId::View(id),
918            CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(id),
919            CatalogItemType::Sink
920            | CatalogItemType::Index
921            | CatalogItemType::Type
922            | CatalogItemType::Func
923            | CatalogItemType::Secret
924            | CatalogItemType::Connection => continue,
925        };
926        deleted_comments.insert(comment_id);
927    }
928    // If you are 100% positive that it is safe to delete a system object outside any of the
929    // unstable schemas, then add it to this set. Make sure that no prod environments are
930    // using this object and that the upgrade checker does not show any issues.
931    //
932    // Objects can be removed from this set after one release.
933    let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
934    // TODO(jkosh44) Technically we could support changing the type of a builtin object outside
935    // of unstable schemas (i.e. from a table to a view). However, builtin migrations don't currently
936    // handle that scenario correctly.
937    assert!(
938        deleted_system_objects
939            .iter()
940            // It's okay if Indexes change because they're inherently ephemeral.
941            .filter(|object| object.object_type != CatalogItemType::Index)
942            .all(
943                |deleted_object| is_unstable_schema(&deleted_object.schema_name)
944                    || delete_exceptions.contains(deleted_object)
945            ),
946        "only objects in unstable schemas can be deleted, deleted objects: {:?}",
947        deleted_system_objects
948    );
949    txn.drop_comments(&deleted_comments)?;
950    txn.remove_items(&deleted_runtime_alterable_system_ids)?;
951    txn.remove_system_object_mappings(deleted_system_objects)?;
952
953    // Filter down to just the GlobalIds which are used to track the underlying collections.
954    let new_builtin_collections = new_builtin_ids
955        .into_iter()
956        .map(|(_catalog_id, global_id)| global_id)
957        .collect();
958
959    Ok(new_builtin_collections)
960}
961
962fn add_new_remove_old_builtin_clusters_migration(
963    txn: &mut mz_catalog::durable::Transaction<'_>,
964    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
965    boot_ts: Timestamp,
966) -> Result<(), mz_catalog::durable::CatalogError> {
967    let mut durable_clusters: BTreeMap<_, _> = txn
968        .get_clusters()
969        .filter(|cluster| cluster.id.is_system())
970        .map(|cluster| (cluster.name.to_string(), cluster))
971        .collect();
972
973    // Add new clusters.
974    for builtin_cluster in BUILTIN_CLUSTERS {
975        if durable_clusters.remove(builtin_cluster.name).is_none() {
976            let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
977
978            let cluster_id = txn.insert_system_cluster(
979                builtin_cluster.name,
980                vec![],
981                builtin_cluster.privileges.to_vec(),
982                builtin_cluster.owner_id.to_owned(),
983                mz_catalog::durable::ClusterConfig {
984                    variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
985                        size: cluster_config.size,
986                        availability_zones: vec![],
987                        replication_factor: cluster_config.replication_factor,
988                        logging: default_logging_config(),
989                        optimizer_feature_overrides: Default::default(),
990                        schedule: Default::default(),
991                        auto_scaling_strategy: None,
992                        reconfiguration: None,
993                        burst: None,
994                    }),
995                    workload_class: None,
996                },
997                &HashSet::new(),
998            )?;
999
1000            let audit_id = txn.allocate_audit_log_id()?;
1001            txn.insert_audit_log_event(VersionedEvent::new(
1002                audit_id,
1003                EventType::Create,
1004                ObjectType::Cluster,
1005                EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1006                    id: cluster_id.to_string(),
1007                    name: builtin_cluster.name.to_string(),
1008                }),
1009                None,
1010                boot_ts.into(),
1011            ));
1012        }
1013    }
1014
1015    // Remove old clusters.
1016    let old_clusters = durable_clusters
1017        .values()
1018        .map(|cluster| cluster.id)
1019        .collect();
1020    txn.remove_clusters(&old_clusters)?;
1021
1022    for (_name, cluster) in &durable_clusters {
1023        let audit_id = txn.allocate_audit_log_id()?;
1024        txn.insert_audit_log_event(VersionedEvent::new(
1025            audit_id,
1026            EventType::Drop,
1027            ObjectType::Cluster,
1028            EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1029                id: cluster.id.to_string(),
1030                name: cluster.name.clone(),
1031            }),
1032            None,
1033            boot_ts.into(),
1034        ));
1035    }
1036
1037    Ok(())
1038}
1039
1040fn add_new_remove_old_builtin_introspection_source_migration(
1041    txn: &mut mz_catalog::durable::Transaction<'_>,
1042) -> Result<(), AdapterError> {
1043    let mut new_indexes = Vec::new();
1044    let mut removed_indexes = BTreeSet::new();
1045    for cluster in txn.get_clusters() {
1046        let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1047
1048        let mut new_logs = Vec::new();
1049
1050        for log in BUILTINS::logs() {
1051            if introspection_source_index_ids.remove(log.name).is_none() {
1052                new_logs.push(log);
1053            }
1054        }
1055
1056        for log in new_logs {
1057            let (item_id, gid) =
1058                Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1059            new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1060        }
1061
1062        // Anything left in `introspection_source_index_ids` must have been deleted and should be
1063        // removed from the catalog.
1064        removed_indexes.extend(
1065            introspection_source_index_ids
1066                .into_keys()
1067                .map(|name| (cluster.id, name.to_string())),
1068        );
1069    }
1070    txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1071    txn.remove_introspection_source_indexes(removed_indexes)?;
1072    Ok(())
1073}
1074
1075fn add_new_remove_old_builtin_roles_migration(
1076    txn: &mut mz_catalog::durable::Transaction<'_>,
1077) -> Result<(), mz_catalog::durable::CatalogError> {
1078    let mut durable_roles: BTreeMap<_, _> = txn
1079        .get_roles()
1080        .filter(|role| role.id.is_system() || role.id.is_predefined())
1081        .map(|role| (role.name.to_string(), role))
1082        .collect();
1083
1084    // Add new roles.
1085    for builtin_role in BUILTIN_ROLES {
1086        if durable_roles.remove(builtin_role.name).is_none() {
1087            txn.insert_builtin_role(
1088                builtin_role.id,
1089                builtin_role.name.to_string(),
1090                builtin_role.attributes.clone(),
1091                RoleMembership::new(),
1092                RoleVars::default(),
1093                builtin_role.oid,
1094            )?;
1095        }
1096    }
1097
1098    // Remove old roles.
1099    let old_roles = durable_roles.values().map(|role| role.id).collect();
1100    txn.remove_roles(&old_roles)?;
1101
1102    Ok(())
1103}
1104
1105fn add_new_remove_old_builtin_cluster_replicas_migration(
1106    txn: &mut Transaction<'_>,
1107    builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1108    boot_ts: Timestamp,
1109) -> Result<(), AdapterError> {
1110    let cluster_lookup: BTreeMap<_, _> = txn
1111        .get_clusters()
1112        .map(|cluster| (cluster.name.clone(), cluster.clone()))
1113        .collect();
1114
1115    let cluster_id_to_name: BTreeMap<ClusterId, String> = cluster_lookup
1116        .values()
1117        .map(|cluster| (cluster.id, cluster.name.clone()))
1118        .collect();
1119
1120    let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1121        .get_cluster_replicas()
1122        .filter(|replica| replica.replica_id.is_system())
1123        .fold(BTreeMap::new(), |mut acc, replica| {
1124            acc.entry(replica.cluster_id)
1125                .or_insert_with(BTreeMap::new)
1126                .insert(replica.name.to_string(), replica);
1127            acc
1128        });
1129
1130    // Add new replicas.
1131    for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1132        let cluster = cluster_lookup
1133            .get(builtin_replica.cluster_name)
1134            .expect("builtin cluster replica references non-existent cluster");
1135        // `empty_map` is a hack to simplify the if statement below.
1136        let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1137        let replica_names = durable_replicas
1138            .get_mut(&cluster.id)
1139            .unwrap_or(&mut empty_map);
1140
1141        let builtin_cluster_bootstrap_config =
1142            builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1143        if replica_names.remove(builtin_replica.name).is_none()
1144            // NOTE(SangJunBak): We need to explicitly check the replication factor because
1145            // BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1146            // factor is configurable on bootstrap.
1147            && builtin_cluster_bootstrap_config.replication_factor > 0
1148        {
1149            let replica_size = match cluster.config.variant {
1150                ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1151                ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size.clone(),
1152            };
1153
1154            let config = builtin_cluster_replica_config(replica_size.clone());
1155            let replica_id = txn.insert_cluster_replica(
1156                cluster.id,
1157                builtin_replica.name,
1158                config,
1159                MZ_SYSTEM_ROLE_ID,
1160            )?;
1161
1162            let audit_id = txn.allocate_audit_log_id()?;
1163            txn.insert_audit_log_event(VersionedEvent::new(
1164                audit_id,
1165                EventType::Create,
1166                ObjectType::ClusterReplica,
1167                EventDetails::CreateClusterReplicaV4(mz_audit_log::CreateClusterReplicaV4 {
1168                    cluster_id: cluster.id.to_string(),
1169                    cluster_name: cluster.name.clone(),
1170                    replica_id: Some(replica_id.to_string()),
1171                    replica_name: builtin_replica.name.to_string(),
1172                    logical_size: replica_size,
1173                    billed_as: None,
1174                    internal: false,
1175                    reason: CreateOrDropClusterReplicaReasonV1::System,
1176                    scheduling_policies: None,
1177                }),
1178                None,
1179                boot_ts.into(),
1180            ));
1181        }
1182    }
1183
1184    // Remove old replicas.
1185    let old_replicas: Vec<_> = durable_replicas
1186        .values()
1187        .flat_map(|replicas| replicas.values())
1188        .collect();
1189    let old_replica_ids = old_replicas.iter().map(|r| r.replica_id).collect();
1190    txn.remove_cluster_replicas(&old_replica_ids)?;
1191
1192    for replica in &old_replicas {
1193        let cluster_name = cluster_id_to_name
1194            .get(&replica.cluster_id)
1195            .cloned()
1196            .unwrap_or_else(|| "<unknown>".to_string());
1197
1198        let audit_id = txn.allocate_audit_log_id()?;
1199        txn.insert_audit_log_event(VersionedEvent::new(
1200            audit_id,
1201            EventType::Drop,
1202            ObjectType::ClusterReplica,
1203            EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1204                cluster_id: replica.cluster_id.to_string(),
1205                cluster_name,
1206                replica_id: Some(replica.replica_id.to_string()),
1207                replica_name: replica.name.clone(),
1208                reason: CreateOrDropClusterReplicaReasonV1::System,
1209                scheduling_policies: None,
1210            }),
1211            None,
1212            boot_ts.into(),
1213        ));
1214    }
1215
1216    Ok(())
1217}
1218
1219/// Roles can have default values for configuration parameters, e.g. you can set a Role default for
1220/// the 'cluster' parameter.
1221///
1222/// This migration exists to remove the Role default for a configuration parameter, if the persisted
1223/// input is no longer valid. For example if we remove a configuration parameter or change the
1224/// accepted set of values.
1225fn remove_invalid_config_param_role_defaults_migration(
1226    txn: &mut Transaction<'_>,
1227) -> Result<(), AdapterError> {
1228    static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1229
1230    let roles_to_migrate: BTreeMap<_, _> = txn
1231        .get_roles()
1232        .filter_map(|mut role| {
1233            // Create an empty SessionVars just so we can check if a var is valid.
1234            //
1235            // TODO(parkmycar): This is a bit hacky, instead we should have a static list of all
1236            // session variables.
1237            let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1238
1239            // Iterate over all of the variable defaults for this role.
1240            let mut invalid_roles_vars = BTreeMap::new();
1241            for (name, value) in &role.vars.map {
1242                // If one does not exist or its value is invalid, then mark it for removal.
1243                let Ok(session_var) = session_vars.inspect(name) else {
1244                    invalid_roles_vars.insert(name.clone(), value.clone());
1245                    continue;
1246                };
1247                if session_var.check(value.borrow()).is_err() {
1248                    invalid_roles_vars.insert(name.clone(), value.clone());
1249                }
1250            }
1251
1252            // If the role has no invalid values, nothing to do!
1253            if invalid_roles_vars.is_empty() {
1254                return None;
1255            }
1256
1257            tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1258
1259            // Otherwise, remove the variables from the role and return it to be updated.
1260            for (name, _value) in invalid_roles_vars {
1261                role.vars.map.remove(&name);
1262            }
1263            Some(role)
1264        })
1265        .map(|role| (role.id, role))
1266        .collect();
1267
1268    txn.update_roles_without_auth(roles_to_migrate)?;
1269
1270    Ok(())
1271}
1272
1273/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
1274/// are marked as pending and should be cleaned up on catalog open.
1275fn remove_pending_cluster_replicas_migration(
1276    tx: &mut Transaction,
1277    boot_ts: mz_repr::Timestamp,
1278) -> Result<(), anyhow::Error> {
1279    // Build a map of cluster_id -> cluster_name for audit events.
1280    let cluster_names: BTreeMap<_, _> = tx.get_clusters().map(|c| (c.id, c.name)).collect();
1281
1282    let occurred_at = boot_ts.into();
1283
1284    for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1285        if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1286            replica.config.location
1287        {
1288            let cluster_name = cluster_names
1289                .get(&replica.cluster_id)
1290                .cloned()
1291                .unwrap_or_else(|| "<unknown>".to_string());
1292
1293            info!(
1294                "removing pending cluster replica '{}' from cluster '{}'",
1295                replica.name, cluster_name,
1296            );
1297
1298            tx.remove_cluster_replica(replica.replica_id)?;
1299
1300            // Emit an audit log event so that the drop is visible in
1301            // mz_audit_events, matching the create event that was
1302            // recorded when the pending replica was first created.
1303            let audit_id = tx.allocate_audit_log_id()?;
1304            tx.insert_audit_log_event(VersionedEvent::new(
1305                audit_id,
1306                EventType::Drop,
1307                ObjectType::ClusterReplica,
1308                EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1309                    cluster_id: replica.cluster_id.to_string(),
1310                    cluster_name,
1311                    replica_id: Some(replica.replica_id.to_string()),
1312                    replica_name: replica.name,
1313                    reason: CreateOrDropClusterReplicaReasonV1::System,
1314                    scheduling_policies: None,
1315                }),
1316                None,
1317                occurred_at,
1318            ));
1319        }
1320    }
1321    Ok(())
1322}
1323
1324pub(crate) fn builtin_cluster_replica_config(
1325    replica_size: String,
1326) -> mz_catalog::durable::ReplicaConfig {
1327    mz_catalog::durable::ReplicaConfig {
1328        location: mz_catalog::durable::ReplicaLocation::Managed {
1329            availability_zones: Vec::new(),
1330            billed_as: None,
1331            pending: false,
1332            internal: false,
1333            size: replica_size,
1334        },
1335        logging: default_logging_config(),
1336    }
1337}
1338
1339fn default_logging_config() -> ReplicaLogging {
1340    ReplicaLogging {
1341        log_logging: false,
1342        interval: Some(Duration::from_secs(1)),
1343    }
1344}
1345
1346#[derive(Debug)]
1347pub struct BuiltinBootstrapClusterConfigMap {
1348    /// Size and replication factor to default system_cluster on bootstrap
1349    pub system_cluster: BootstrapBuiltinClusterConfig,
1350    /// Size and replication factor to default catalog_server_cluster on bootstrap
1351    pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1352    /// Size and replication factor to default probe_cluster on bootstrap
1353    pub probe_cluster: BootstrapBuiltinClusterConfig,
1354    /// Size and replication factor to default support_cluster on bootstrap
1355    pub support_cluster: BootstrapBuiltinClusterConfig,
1356    /// Size to default analytics_cluster on bootstrap
1357    pub analytics_cluster: BootstrapBuiltinClusterConfig,
1358}
1359
1360impl BuiltinBootstrapClusterConfigMap {
1361    /// Gets the size of the builtin cluster based on the provided name
1362    fn get_config(
1363        &self,
1364        cluster_name: &str,
1365    ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1366        let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1367            &self.system_cluster
1368        } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1369            &self.catalog_server_cluster
1370        } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1371            &self.probe_cluster
1372        } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1373            &self.support_cluster
1374        } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1375            &self.analytics_cluster
1376        } else {
1377            return Err(mz_catalog::durable::CatalogError::Catalog(
1378                SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1379            ));
1380        };
1381        Ok(cluster_config.clone())
1382    }
1383}
1384
1385/// Convert `updates` into a `Vec` that can be consolidated by doing the following:
1386///
1387///   - Convert each update into a type that implements [`std::cmp::Ord`].
1388///   - Update the timestamp of each update to the same value.
1389///   - Convert the diff of each update to a type that implements
1390///     [`differential_dataflow::difference::Semigroup`].
1391///
1392/// [`mz_catalog::memory::objects::StateUpdateKind`] does not implement [`std::cmp::Ord`] only
1393/// because it contains a variant for temporary items, which do not implement [`std::cmp::Ord`].
1394/// However, we know that during bootstrap no temporary items exist, because they are not persisted
1395/// and are only created after bootstrap is complete. So we forcibly convert each
1396/// [`mz_catalog::memory::objects::StateUpdateKind`] into an [`BootstrapStateUpdateKind`], which is
1397/// identical to [`mz_catalog::memory::objects::StateUpdateKind`] except it doesn't have a
1398/// temporary item variant and does implement [`std::cmp::Ord`].
1399///
1400/// WARNING: Do not call outside of startup.
1401pub(crate) fn into_consolidatable_updates_startup(
1402    updates: Vec<StateUpdate>,
1403    ts: Timestamp,
1404) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1405    updates
1406        .into_iter()
1407        .map(|StateUpdate { kind, ts: _, diff }| {
1408            let kind: BootstrapStateUpdateKind = kind
1409                .try_into()
1410                .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1411            (kind, ts, Diff::from(diff))
1412        })
1413        .collect()
1414}