mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
35    NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{assert_none, instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::index_sql;
67
68/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
69/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
70/// results in applying a retraction for that object followed by applying an addition for that
71/// object. When applying those additions it can be extremely expensive to re-build that
72/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
73/// retractions, so it can be used during additions.
74///
75/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
76/// objects that maintain denormalized state.
77// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
78// the update step is a no-op for certain types.
79#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81    roles: BTreeMap<RoleKey, Role>,
82    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83    databases: BTreeMap<DatabaseKey, Database>,
84    schemas: BTreeMap<SchemaKey, Schema>,
85    clusters: BTreeMap<ClusterKey, Cluster>,
86    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87    items: BTreeMap<ItemKey, CatalogEntry>,
88    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
95    ///
96    /// Returns builtin table updates corresponding to the changes to catalog state.
97    #[must_use]
98    #[instrument]
99    pub(crate) async fn apply_updates(
100        &mut self,
101        updates: Vec<StateUpdate>,
102        local_expression_cache: &mut LocalExpressionCache,
103    ) -> (
104        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
105        Vec<ParsedStateUpdate>,
106    ) {
107        let mut builtin_table_updates = Vec::with_capacity(updates.len());
108        let mut catalog_updates = Vec::with_capacity(updates.len());
109
110        // First, consolidate updates. The code that applies parsed state
111        // updates _requires_ that the given updates are consolidated. There
112        // must be at most one addition and/or one retraction for a given item,
113        // as identified by that items ID type.
114        let updates = Self::consolidate_updates(updates);
115
116        // Apply updates in groups, according to their timestamps.
117        let mut groups: Vec<Vec<_>> = Vec::new();
118        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
119            // Bring the updates into the pseudo-topological order that we need
120            // for updating our in-memory state and generating builtin table
121            // updates.
122            let updates = sort_updates(updates.collect());
123            groups.push(updates);
124        }
125
126        for updates in groups {
127            let mut apply_state = ApplyState::Updates(Vec::new());
128            let mut retractions = InProgressRetractions::default();
129
130            for update in updates {
131                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
132                    .step(
133                        ApplyState::new(update),
134                        self,
135                        &mut retractions,
136                        local_expression_cache,
137                    )
138                    .await;
139                apply_state = next_apply_state;
140                builtin_table_updates.extend(builtin_table_update);
141                catalog_updates.extend(catalog_update);
142            }
143
144            // Apply remaining state.
145            let (builtin_table_update, catalog_update) = apply_state
146                .apply(self, &mut retractions, local_expression_cache)
147                .await;
148            builtin_table_updates.extend(builtin_table_update);
149            catalog_updates.extend(catalog_update);
150        }
151
152        (builtin_table_updates, catalog_updates)
153    }
154
155    /// It can happen that the sequencing logic creates "fluctuating" updates
156    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
157    /// for a table, there will be a retraction of the original table state,
158    /// then an addition for the same table but stripped of some of the roles
159    /// and access things, and then a retraction for that intermediate table
160    /// state. By consolidating, the intermediate state addition/retraction will
161    /// cancel out and we'll only see the retraction for the original state.
162    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
163        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
164            .into_iter()
165            .map(|update| (update.kind, update.ts, update.diff.into()))
166            .collect_vec();
167
168        consolidate_updates(&mut updates);
169
170        updates
171            .into_iter()
172            .map(|(kind, ts, diff)| StateUpdate {
173                kind,
174                ts,
175                diff: diff
176                    .try_into()
177                    .expect("catalog state cannot have diff other than -1 or 1"),
178            })
179            .collect_vec()
180    }
181
182    #[instrument(level = "debug")]
183    fn apply_updates_inner(
184        &mut self,
185        updates: Vec<StateUpdate>,
186        retractions: &mut InProgressRetractions,
187        local_expression_cache: &mut LocalExpressionCache,
188    ) -> Result<
189        (
190            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
191            Vec<ParsedStateUpdate>,
192        ),
193        CatalogError,
194    > {
195        soft_assert_no_log!(
196            updates.iter().map(|update| update.ts).all_equal(),
197            "all timestamps should be equal: {updates:?}"
198        );
199
200        let mut update_system_config = false;
201
202        let mut builtin_table_updates = Vec::with_capacity(updates.len());
203        let mut catalog_updates = Vec::new();
204
205        for state_update in updates {
206            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
207                update_system_config = true;
208            }
209
210            match state_update.diff {
211                StateDiff::Retraction => {
212                    // We want the parsed catalog updates to match the state of
213                    // the catalog _before_ applying a retraction. So that we can
214                    // still have useful in-memory state to work with.
215                    if let Some(update) =
216                        parsed_state_updates::parse_state_update(self, state_update.clone())
217                    {
218                        catalog_updates.push(update);
219                    }
220
221                    // We want the builtin table retraction to match the state of the catalog
222                    // before applying the update.
223                    builtin_table_updates.extend(self.generate_builtin_table_update(
224                        state_update.kind.clone(),
225                        state_update.diff,
226                    ));
227                    self.apply_update(
228                        state_update.kind,
229                        state_update.diff,
230                        retractions,
231                        local_expression_cache,
232                    )?;
233                }
234                StateDiff::Addition => {
235                    self.apply_update(
236                        state_update.kind.clone(),
237                        state_update.diff,
238                        retractions,
239                        local_expression_cache,
240                    )?;
241                    // We want the builtin table addition to match the state of
242                    // the catalog after applying the update. So that we already
243                    // have useful in-memory state to work with.
244                    builtin_table_updates.extend(self.generate_builtin_table_update(
245                        state_update.kind.clone(),
246                        state_update.diff,
247                    ));
248
249                    // We want the parsed catalog updates to match the state of
250                    // the catalog _after_ applying an addition.
251                    if let Some(update) =
252                        parsed_state_updates::parse_state_update(self, state_update.clone())
253                    {
254                        catalog_updates.push(update);
255                    }
256                }
257            }
258        }
259
260        if update_system_config {
261            self.system_configuration.dyncfg_updates();
262        }
263
264        Ok((builtin_table_updates, catalog_updates))
265    }
266
267    #[instrument(level = "debug")]
268    fn apply_update(
269        &mut self,
270        kind: StateUpdateKind,
271        diff: StateDiff,
272        retractions: &mut InProgressRetractions,
273        local_expression_cache: &mut LocalExpressionCache,
274    ) -> Result<(), CatalogError> {
275        match kind {
276            StateUpdateKind::Role(role) => {
277                self.apply_role_update(role, diff, retractions);
278            }
279            StateUpdateKind::RoleAuth(role_auth) => {
280                self.apply_role_auth_update(role_auth, diff, retractions);
281            }
282            StateUpdateKind::Database(database) => {
283                self.apply_database_update(database, diff, retractions);
284            }
285            StateUpdateKind::Schema(schema) => {
286                self.apply_schema_update(schema, diff, retractions);
287            }
288            StateUpdateKind::DefaultPrivilege(default_privilege) => {
289                self.apply_default_privilege_update(default_privilege, diff, retractions);
290            }
291            StateUpdateKind::SystemPrivilege(system_privilege) => {
292                self.apply_system_privilege_update(system_privilege, diff, retractions);
293            }
294            StateUpdateKind::SystemConfiguration(system_configuration) => {
295                self.apply_system_configuration_update(system_configuration, diff, retractions);
296            }
297            StateUpdateKind::Cluster(cluster) => {
298                self.apply_cluster_update(cluster, diff, retractions);
299            }
300            StateUpdateKind::NetworkPolicy(network_policy) => {
301                self.apply_network_policy_update(network_policy, diff, retractions);
302            }
303            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304                self.apply_introspection_source_index_update(
305                    introspection_source_index,
306                    diff,
307                    retractions,
308                );
309            }
310            StateUpdateKind::ClusterReplica(cluster_replica) => {
311                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
312            }
313            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
314                self.apply_system_object_mapping_update(
315                    system_object_mapping,
316                    diff,
317                    retractions,
318                    local_expression_cache,
319                );
320            }
321            StateUpdateKind::TemporaryItem(item) => {
322                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
323            }
324            StateUpdateKind::Item(item) => {
325                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
326            }
327            StateUpdateKind::Comment(comment) => {
328                self.apply_comment_update(comment, diff, retractions);
329            }
330            StateUpdateKind::SourceReferences(source_reference) => {
331                self.apply_source_references_update(source_reference, diff, retractions);
332            }
333            StateUpdateKind::AuditLog(_audit_log) => {
334                // Audit logs are not stored in-memory.
335            }
336            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
337                self.apply_storage_collection_metadata_update(
338                    storage_collection_metadata,
339                    diff,
340                    retractions,
341                );
342            }
343            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
344                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
345            }
346        }
347
348        Ok(())
349    }
350
351    #[instrument(level = "debug")]
352    fn apply_role_auth_update(
353        &mut self,
354        role_auth: mz_catalog::durable::RoleAuth,
355        diff: StateDiff,
356        retractions: &mut InProgressRetractions,
357    ) {
358        apply_with_update(
359            &mut self.role_auth_by_id,
360            role_auth,
361            |role_auth| role_auth.role_id,
362            diff,
363            &mut retractions.role_auths,
364        );
365    }
366
367    #[instrument(level = "debug")]
368    fn apply_role_update(
369        &mut self,
370        role: mz_catalog::durable::Role,
371        diff: StateDiff,
372        retractions: &mut InProgressRetractions,
373    ) {
374        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
375        apply_with_update(
376            &mut self.roles_by_id,
377            role,
378            |role| role.id,
379            diff,
380            &mut retractions.roles,
381        );
382    }
383
384    #[instrument(level = "debug")]
385    fn apply_database_update(
386        &mut self,
387        database: mz_catalog::durable::Database,
388        diff: StateDiff,
389        retractions: &mut InProgressRetractions,
390    ) {
391        apply_inverted_lookup(
392            &mut self.database_by_name,
393            &database.name,
394            database.id,
395            diff,
396        );
397        apply_with_update(
398            &mut self.database_by_id,
399            database,
400            |database| database.id,
401            diff,
402            &mut retractions.databases,
403        );
404    }
405
406    #[instrument(level = "debug")]
407    fn apply_schema_update(
408        &mut self,
409        schema: mz_catalog::durable::Schema,
410        diff: StateDiff,
411        retractions: &mut InProgressRetractions,
412    ) {
413        let (schemas_by_id, schemas_by_name) = match &schema.database_id {
414            Some(database_id) => {
415                let db = self
416                    .database_by_id
417                    .get_mut(database_id)
418                    .expect("catalog out of sync");
419                (&mut db.schemas_by_id, &mut db.schemas_by_name)
420            }
421            None => (
422                &mut self.ambient_schemas_by_id,
423                &mut self.ambient_schemas_by_name,
424            ),
425        };
426        apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
427        apply_with_update(
428            schemas_by_id,
429            schema,
430            |schema| schema.id,
431            diff,
432            &mut retractions.schemas,
433        );
434    }
435
436    #[instrument(level = "debug")]
437    fn apply_default_privilege_update(
438        &mut self,
439        default_privilege: mz_catalog::durable::DefaultPrivilege,
440        diff: StateDiff,
441        _retractions: &mut InProgressRetractions,
442    ) {
443        match diff {
444            StateDiff::Addition => self
445                .default_privileges
446                .grant(default_privilege.object, default_privilege.acl_item),
447            StateDiff::Retraction => self
448                .default_privileges
449                .revoke(&default_privilege.object, &default_privilege.acl_item),
450        }
451    }
452
453    #[instrument(level = "debug")]
454    fn apply_system_privilege_update(
455        &mut self,
456        system_privilege: MzAclItem,
457        diff: StateDiff,
458        _retractions: &mut InProgressRetractions,
459    ) {
460        match diff {
461            StateDiff::Addition => self.system_privileges.grant(system_privilege),
462            StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
463        }
464    }
465
466    #[instrument(level = "debug")]
467    fn apply_system_configuration_update(
468        &mut self,
469        system_configuration: mz_catalog::durable::SystemConfiguration,
470        diff: StateDiff,
471        _retractions: &mut InProgressRetractions,
472    ) {
473        let res = match diff {
474            StateDiff::Addition => self.insert_system_configuration(
475                &system_configuration.name,
476                VarInput::Flat(&system_configuration.value),
477            ),
478            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
479        };
480        match res {
481            Ok(_) => (),
482            // When system variables are deleted, nothing deletes them from the underlying
483            // durable catalog, which isn't great. Still, we need to be able to ignore
484            // unknown variables.
485            Err(Error {
486                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
487            }) => {
488                warn!(%name, "unknown system parameter from catalog storage");
489            }
490            Err(e) => panic!("unable to update system variable: {e:?}"),
491        }
492    }
493
494    #[instrument(level = "debug")]
495    fn apply_cluster_update(
496        &mut self,
497        cluster: mz_catalog::durable::Cluster,
498        diff: StateDiff,
499        retractions: &mut InProgressRetractions,
500    ) {
501        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
502        apply_with_update(
503            &mut self.clusters_by_id,
504            cluster,
505            |cluster| cluster.id,
506            diff,
507            &mut retractions.clusters,
508        );
509    }
510
511    #[instrument(level = "debug")]
512    fn apply_network_policy_update(
513        &mut self,
514        policy: mz_catalog::durable::NetworkPolicy,
515        diff: StateDiff,
516        retractions: &mut InProgressRetractions,
517    ) {
518        apply_inverted_lookup(
519            &mut self.network_policies_by_name,
520            &policy.name,
521            policy.id,
522            diff,
523        );
524        apply_with_update(
525            &mut self.network_policies_by_id,
526            policy,
527            |policy| policy.id,
528            diff,
529            &mut retractions.network_policies,
530        );
531    }
532
533    #[instrument(level = "debug")]
534    fn apply_introspection_source_index_update(
535        &mut self,
536        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
537        diff: StateDiff,
538        retractions: &mut InProgressRetractions,
539    ) {
540        let cluster = self
541            .clusters_by_id
542            .get_mut(&introspection_source_index.cluster_id)
543            .expect("catalog out of sync");
544        let log = BUILTIN_LOG_LOOKUP
545            .get(introspection_source_index.name.as_str())
546            .expect("missing log");
547        apply_inverted_lookup(
548            &mut cluster.log_indexes,
549            &log.variant,
550            introspection_source_index.index_id,
551            diff,
552        );
553
554        match diff {
555            StateDiff::Addition => {
556                if let Some(mut entry) = retractions
557                    .introspection_source_indexes
558                    .remove(&introspection_source_index.item_id)
559                {
560                    // This should only happen during startup as a result of builtin migrations. We
561                    // create a new index item and replace the old one with it.
562                    let (index_name, index) = self.create_introspection_source_index(
563                        introspection_source_index.cluster_id,
564                        log,
565                        introspection_source_index.index_id,
566                    );
567                    assert_eq!(entry.id, introspection_source_index.item_id);
568                    assert_eq!(entry.oid, introspection_source_index.oid);
569                    assert_eq!(entry.name, index_name);
570                    entry.item = index;
571                    self.insert_entry(entry);
572                } else {
573                    self.insert_introspection_source_index(
574                        introspection_source_index.cluster_id,
575                        log,
576                        introspection_source_index.item_id,
577                        introspection_source_index.index_id,
578                        introspection_source_index.oid,
579                    );
580                }
581            }
582            StateDiff::Retraction => {
583                let entry = self.drop_item(introspection_source_index.item_id);
584                retractions
585                    .introspection_source_indexes
586                    .insert(entry.id, entry);
587            }
588        }
589    }
590
591    #[instrument(level = "debug")]
592    fn apply_cluster_replica_update(
593        &mut self,
594        cluster_replica: mz_catalog::durable::ClusterReplica,
595        diff: StateDiff,
596        _retractions: &mut InProgressRetractions,
597    ) {
598        let cluster = self
599            .clusters_by_id
600            .get(&cluster_replica.cluster_id)
601            .expect("catalog out of sync");
602        let azs = cluster.availability_zones();
603        let location = self
604            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
605            .expect("catalog in unexpected state");
606        let cluster = self
607            .clusters_by_id
608            .get_mut(&cluster_replica.cluster_id)
609            .expect("catalog out of sync");
610        apply_inverted_lookup(
611            &mut cluster.replica_id_by_name_,
612            &cluster_replica.name,
613            cluster_replica.replica_id,
614            diff,
615        );
616        match diff {
617            StateDiff::Retraction => {
618                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
619                assert!(
620                    prev.is_some(),
621                    "retraction does not match existing value: {:?}",
622                    cluster_replica.replica_id
623                );
624            }
625            StateDiff::Addition => {
626                let logging = ReplicaLogging {
627                    log_logging: cluster_replica.config.logging.log_logging,
628                    interval: cluster_replica.config.logging.interval,
629                };
630                let config = ReplicaConfig {
631                    location,
632                    compute: ComputeReplicaConfig { logging },
633                };
634                let mem_cluster_replica = ClusterReplica {
635                    name: cluster_replica.name.clone(),
636                    cluster_id: cluster_replica.cluster_id,
637                    replica_id: cluster_replica.replica_id,
638                    config,
639                    owner_id: cluster_replica.owner_id,
640                };
641                let prev = cluster
642                    .replicas_by_id_
643                    .insert(cluster_replica.replica_id, mem_cluster_replica);
644                assert_eq!(
645                    prev, None,
646                    "values must be explicitly retracted before inserting a new value: {:?}",
647                    cluster_replica.replica_id
648                );
649            }
650        }
651    }
652
653    #[instrument(level = "debug")]
654    fn apply_system_object_mapping_update(
655        &mut self,
656        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
657        diff: StateDiff,
658        retractions: &mut InProgressRetractions,
659        local_expression_cache: &mut LocalExpressionCache,
660    ) {
661        let item_id = system_object_mapping.unique_identifier.catalog_id;
662        let global_id = system_object_mapping.unique_identifier.global_id;
663
664        if system_object_mapping.unique_identifier.runtime_alterable() {
665            // Runtime-alterable system objects have real entries in the items
666            // collection and so get handled through the normal `insert_item`
667            // and `drop_item` code paths.
668            return;
669        }
670
671        if let StateDiff::Retraction = diff {
672            let entry = self.drop_item(item_id);
673            retractions.system_object_mappings.insert(item_id, entry);
674            return;
675        }
676
677        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
678            // This implies that we updated the fingerprint for some builtin item. The retraction
679            // was parsed, planned, and optimized using the compiled in definition, not the
680            // definition from a previous version. So we can just stick the old entry back into the
681            // catalog.
682            self.insert_entry(entry);
683            return;
684        }
685
686        let builtin = BUILTIN_LOOKUP
687            .get(&system_object_mapping.description)
688            .expect("missing builtin")
689            .1;
690        let schema_name = builtin.schema();
691        let schema_id = self
692            .ambient_schemas_by_name
693            .get(schema_name)
694            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
695        let name = QualifiedItemName {
696            qualifiers: ItemQualifiers {
697                database_spec: ResolvedDatabaseSpecifier::Ambient,
698                schema_spec: SchemaSpecifier::Id(*schema_id),
699            },
700            item: builtin.name().into(),
701        };
702        match builtin {
703            Builtin::Log(log) => {
704                let mut acl_items = vec![rbac::owner_privilege(
705                    mz_sql::catalog::ObjectType::Source,
706                    MZ_SYSTEM_ROLE_ID,
707                )];
708                acl_items.extend_from_slice(&log.access);
709                self.insert_item(
710                    item_id,
711                    log.oid,
712                    name.clone(),
713                    CatalogItem::Log(Log {
714                        variant: log.variant,
715                        global_id,
716                    }),
717                    MZ_SYSTEM_ROLE_ID,
718                    PrivilegeMap::from_mz_acl_items(acl_items),
719                );
720            }
721
722            Builtin::Table(table) => {
723                let mut acl_items = vec![rbac::owner_privilege(
724                    mz_sql::catalog::ObjectType::Table,
725                    MZ_SYSTEM_ROLE_ID,
726                )];
727                acl_items.extend_from_slice(&table.access);
728
729                self.insert_item(
730                    item_id,
731                    table.oid,
732                    name.clone(),
733                    CatalogItem::Table(Table {
734                        create_sql: None,
735                        desc: VersionedRelationDesc::new(table.desc.clone()),
736                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
737                        conn_id: None,
738                        resolved_ids: ResolvedIds::empty(),
739                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
740                            || {
741                                self.system_config()
742                                    .metrics_retention()
743                                    .try_into()
744                                    .expect("invalid metrics retention")
745                            },
746                        ),
747                        is_retained_metrics_object: table.is_retained_metrics_object,
748                        data_source: TableDataSource::TableWrites {
749                            defaults: vec![Expr::null(); table.desc.arity()],
750                        },
751                    }),
752                    MZ_SYSTEM_ROLE_ID,
753                    PrivilegeMap::from_mz_acl_items(acl_items),
754                );
755            }
756            Builtin::Index(index) => {
757                let custom_logical_compaction_window =
758                    index.is_retained_metrics_object.then(|| {
759                        self.system_config()
760                            .metrics_retention()
761                            .try_into()
762                            .expect("invalid metrics retention")
763                    });
764                // Indexes can't be versioned.
765                let versions = BTreeMap::new();
766
767                let item = self
768                    .parse_item(
769                        global_id,
770                        &index.create_sql(),
771                        &versions,
772                        None,
773                        index.is_retained_metrics_object,
774                        custom_logical_compaction_window,
775                        local_expression_cache,
776                        None,
777                    )
778                    .unwrap_or_else(|e| {
779                        panic!(
780                            "internal error: failed to load bootstrap index:\n\
781                                    {}\n\
782                                    error:\n\
783                                    {:?}\n\n\
784                                    make sure that the schema name is specified in the builtin index's create sql statement.",
785                            index.name, e
786                        )
787                    });
788                let CatalogItem::Index(_) = item else {
789                    panic!(
790                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
791                        index.name
792                    );
793                };
794
795                self.insert_item(
796                    item_id,
797                    index.oid,
798                    name,
799                    item,
800                    MZ_SYSTEM_ROLE_ID,
801                    PrivilegeMap::default(),
802                );
803            }
804            Builtin::View(_) => {
805                // parse_views is responsible for inserting all builtin views.
806                unreachable!("views added elsewhere");
807            }
808
809            // Note: Element types must be loaded before array types.
810            Builtin::Type(typ) => {
811                let typ = self.resolve_builtin_type_references(typ);
812                if let CatalogType::Array { element_reference } = typ.details.typ {
813                    let entry = self.get_entry_mut(&element_reference);
814                    let item_type = match &mut entry.item {
815                        CatalogItem::Type(item_type) => item_type,
816                        _ => unreachable!("types can only reference other types"),
817                    };
818                    item_type.details.array_id = Some(item_id);
819                }
820
821                let schema_id = self.resolve_system_schema(typ.schema);
822
823                self.insert_item(
824                    item_id,
825                    typ.oid,
826                    QualifiedItemName {
827                        qualifiers: ItemQualifiers {
828                            database_spec: ResolvedDatabaseSpecifier::Ambient,
829                            schema_spec: SchemaSpecifier::Id(schema_id),
830                        },
831                        item: typ.name.to_owned(),
832                    },
833                    CatalogItem::Type(Type {
834                        create_sql: None,
835                        global_id,
836                        details: typ.details.clone(),
837                        resolved_ids: ResolvedIds::empty(),
838                    }),
839                    MZ_SYSTEM_ROLE_ID,
840                    PrivilegeMap::from_mz_acl_items(vec![
841                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
842                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
843                    ]),
844                );
845            }
846
847            Builtin::Func(func) => {
848                // This OID is never used. `func` has a `Vec` of implementations and
849                // each implementation has its own OID. Those are the OIDs that are
850                // actually used by the system.
851                let oid = INVALID_OID;
852                self.insert_item(
853                    item_id,
854                    oid,
855                    name.clone(),
856                    CatalogItem::Func(Func {
857                        inner: func.inner,
858                        global_id,
859                    }),
860                    MZ_SYSTEM_ROLE_ID,
861                    PrivilegeMap::default(),
862                );
863            }
864
865            Builtin::Source(coll) => {
866                let mut acl_items = vec![rbac::owner_privilege(
867                    mz_sql::catalog::ObjectType::Source,
868                    MZ_SYSTEM_ROLE_ID,
869                )];
870                acl_items.extend_from_slice(&coll.access);
871
872                self.insert_item(
873                    item_id,
874                    coll.oid,
875                    name.clone(),
876                    CatalogItem::Source(Source {
877                        create_sql: None,
878                        data_source: DataSourceDesc::Introspection(coll.data_source),
879                        desc: coll.desc.clone(),
880                        global_id,
881                        timeline: Timeline::EpochMilliseconds,
882                        resolved_ids: ResolvedIds::empty(),
883                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
884                            || {
885                                self.system_config()
886                                    .metrics_retention()
887                                    .try_into()
888                                    .expect("invalid metrics retention")
889                            },
890                        ),
891                        is_retained_metrics_object: coll.is_retained_metrics_object,
892                    }),
893                    MZ_SYSTEM_ROLE_ID,
894                    PrivilegeMap::from_mz_acl_items(acl_items),
895                );
896            }
897            Builtin::ContinualTask(ct) => {
898                let mut acl_items = vec![rbac::owner_privilege(
899                    mz_sql::catalog::ObjectType::Source,
900                    MZ_SYSTEM_ROLE_ID,
901                )];
902                acl_items.extend_from_slice(&ct.access);
903                // Continual Tasks can't be versioned.
904                let versions = BTreeMap::new();
905
906                let item = self
907                    .parse_item(
908                        global_id,
909                        &ct.create_sql(),
910                        &versions,
911                        None,
912                        false,
913                        None,
914                        local_expression_cache,
915                        None,
916                    )
917                    .unwrap_or_else(|e| {
918                        panic!(
919                            "internal error: failed to load bootstrap continual task:\n\
920                                    {}\n\
921                                    error:\n\
922                                    {:?}\n\n\
923                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
924                            ct.name, e
925                        )
926                    });
927                let CatalogItem::ContinualTask(_) = &item else {
928                    panic!(
929                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
930                        ct.name
931                    );
932                };
933
934                self.insert_item(
935                    item_id,
936                    ct.oid,
937                    name,
938                    item,
939                    MZ_SYSTEM_ROLE_ID,
940                    PrivilegeMap::from_mz_acl_items(acl_items),
941                );
942            }
943            Builtin::Connection(connection) => {
944                // Connections can't be versioned.
945                let versions = BTreeMap::new();
946                let mut item = self
947                    .parse_item(
948                        global_id,
949                        connection.sql,
950                        &versions,
951                        None,
952                        false,
953                        None,
954                        local_expression_cache,
955                        None,
956                    )
957                    .unwrap_or_else(|e| {
958                        panic!(
959                            "internal error: failed to load bootstrap connection:\n\
960                                    {}\n\
961                                    error:\n\
962                                    {:?}\n\n\
963                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
964                            connection.name, e
965                        )
966                    });
967                let CatalogItem::Connection(_) = &mut item else {
968                    panic!(
969                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
970                        connection.name
971                    );
972                };
973
974                let mut acl_items = vec![rbac::owner_privilege(
975                    mz_sql::catalog::ObjectType::Connection,
976                    connection.owner_id.clone(),
977                )];
978                acl_items.extend_from_slice(connection.access);
979
980                self.insert_item(
981                    item_id,
982                    connection.oid,
983                    name.clone(),
984                    item,
985                    connection.owner_id.clone(),
986                    PrivilegeMap::from_mz_acl_items(acl_items),
987                );
988            }
989        }
990    }
991
992    #[instrument(level = "debug")]
993    fn apply_temporary_item_update(
994        &mut self,
995        temporary_item: TemporaryItem,
996        diff: StateDiff,
997        retractions: &mut InProgressRetractions,
998        local_expression_cache: &mut LocalExpressionCache,
999    ) {
1000        match diff {
1001            StateDiff::Addition => {
1002                let TemporaryItem {
1003                    id,
1004                    oid,
1005                    global_id,
1006                    schema_id,
1007                    name,
1008                    conn_id,
1009                    create_sql,
1010                    owner_id,
1011                    privileges,
1012                    extra_versions,
1013                } = temporary_item;
1014                // Lazily create the temporary schema if it doesn't exist yet.
1015                // We need the conn_id to create the schema, and it should always be Some for temp items.
1016                let temp_conn_id = conn_id
1017                    .as_ref()
1018                    .expect("temporary items must have a connection id");
1019                if !self.temporary_schemas.contains_key(temp_conn_id) {
1020                    self.create_temporary_schema(temp_conn_id, owner_id)
1021                        .expect("failed to create temporary schema");
1022                }
1023                let schema = self.find_temp_schema(&schema_id);
1024                let name = QualifiedItemName {
1025                    qualifiers: ItemQualifiers {
1026                        database_spec: schema.database().clone(),
1027                        schema_spec: schema.id().clone(),
1028                    },
1029                    item: name.clone(),
1030                };
1031
1032                let entry = match retractions.temp_items.remove(&id) {
1033                    Some(mut retraction) => {
1034                        assert_eq!(retraction.id, id);
1035
1036                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1037                        // item. This is a performance optimization and not needed for correctness.
1038                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1039                        // is still the same as the trait.
1040                        if retraction.create_sql() != create_sql {
1041                            let mut catalog_item = self
1042                                .deserialize_item(
1043                                    global_id,
1044                                    &create_sql,
1045                                    &extra_versions,
1046                                    local_expression_cache,
1047                                    Some(retraction.item),
1048                                )
1049                                .unwrap_or_else(|e| {
1050                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1051                                });
1052                            // Have to patch up the item because parsing doesn't
1053                            // take into account temporary schemas/conn_id.
1054                            // NOTE(aljoscha): I don't like how we're patching
1055                            // this in here, but it's but one of the ways in
1056                            // which temporary items are a bit weird. So, here
1057                            // we are ...
1058                            catalog_item.set_conn_id(conn_id);
1059                            retraction.item = catalog_item;
1060                        }
1061
1062                        retraction.id = id;
1063                        retraction.oid = oid;
1064                        retraction.name = name;
1065                        retraction.owner_id = owner_id;
1066                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1067                        retraction
1068                    }
1069                    None => {
1070                        let mut catalog_item = self
1071                            .deserialize_item(
1072                                global_id,
1073                                &create_sql,
1074                                &extra_versions,
1075                                local_expression_cache,
1076                                None,
1077                            )
1078                            .unwrap_or_else(|e| {
1079                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1080                            });
1081
1082                        // Have to patch up the item because parsing doesn't
1083                        // take into account temporary schemas/conn_id.
1084                        // NOTE(aljoscha): I don't like how we're patching this
1085                        // in here, but it's but one of the ways in which
1086                        // temporary items are a bit weird. So, here we are ...
1087                        catalog_item.set_conn_id(conn_id);
1088
1089                        CatalogEntry {
1090                            item: catalog_item,
1091                            referenced_by: Vec::new(),
1092                            used_by: Vec::new(),
1093                            id,
1094                            oid,
1095                            name,
1096                            owner_id,
1097                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1098                        }
1099                    }
1100                };
1101                self.insert_entry(entry);
1102            }
1103            StateDiff::Retraction => {
1104                let entry = self.drop_item(temporary_item.id);
1105                retractions.temp_items.insert(temporary_item.id, entry);
1106            }
1107        }
1108    }
1109
1110    #[instrument(level = "debug")]
1111    fn apply_item_update(
1112        &mut self,
1113        item: mz_catalog::durable::Item,
1114        diff: StateDiff,
1115        retractions: &mut InProgressRetractions,
1116        local_expression_cache: &mut LocalExpressionCache,
1117    ) -> Result<(), CatalogError> {
1118        match diff {
1119            StateDiff::Addition => {
1120                let key = item.key();
1121                let mz_catalog::durable::Item {
1122                    id,
1123                    oid,
1124                    global_id,
1125                    schema_id,
1126                    name,
1127                    create_sql,
1128                    owner_id,
1129                    privileges,
1130                    extra_versions,
1131                } = item;
1132                let schema = self.find_non_temp_schema(&schema_id);
1133                let name = QualifiedItemName {
1134                    qualifiers: ItemQualifiers {
1135                        database_spec: schema.database().clone(),
1136                        schema_spec: schema.id().clone(),
1137                    },
1138                    item: name.clone(),
1139                };
1140                let entry = match retractions.items.remove(&key) {
1141                    Some(retraction) => {
1142                        assert_eq!(retraction.id, item.id);
1143
1144                        let item = self
1145                            .deserialize_item(
1146                                global_id,
1147                                &create_sql,
1148                                &extra_versions,
1149                                local_expression_cache,
1150                                Some(retraction.item),
1151                            )
1152                            .unwrap_or_else(|e| {
1153                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1154                            });
1155
1156                        CatalogEntry {
1157                            item,
1158                            id,
1159                            oid,
1160                            name,
1161                            owner_id,
1162                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1163                            referenced_by: retraction.referenced_by,
1164                            used_by: retraction.used_by,
1165                        }
1166                    }
1167                    None => {
1168                        let catalog_item = self
1169                            .deserialize_item(
1170                                global_id,
1171                                &create_sql,
1172                                &extra_versions,
1173                                local_expression_cache,
1174                                None,
1175                            )
1176                            .unwrap_or_else(|e| {
1177                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1178                            });
1179                        CatalogEntry {
1180                            item: catalog_item,
1181                            referenced_by: Vec::new(),
1182                            used_by: Vec::new(),
1183                            id,
1184                            oid,
1185                            name,
1186                            owner_id,
1187                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1188                        }
1189                    }
1190                };
1191
1192                self.insert_entry(entry);
1193            }
1194            StateDiff::Retraction => {
1195                let entry = self.drop_item(item.id);
1196                let key = item.into_key_value().0;
1197                retractions.items.insert(key, entry);
1198            }
1199        }
1200        Ok(())
1201    }
1202
1203    #[instrument(level = "debug")]
1204    fn apply_comment_update(
1205        &mut self,
1206        comment: mz_catalog::durable::Comment,
1207        diff: StateDiff,
1208        _retractions: &mut InProgressRetractions,
1209    ) {
1210        match diff {
1211            StateDiff::Addition => {
1212                let prev = self.comments.update_comment(
1213                    comment.object_id,
1214                    comment.sub_component,
1215                    Some(comment.comment),
1216                );
1217                assert_eq!(
1218                    prev, None,
1219                    "values must be explicitly retracted before inserting a new value"
1220                );
1221            }
1222            StateDiff::Retraction => {
1223                let prev =
1224                    self.comments
1225                        .update_comment(comment.object_id, comment.sub_component, None);
1226                assert_eq!(
1227                    prev,
1228                    Some(comment.comment),
1229                    "retraction does not match existing value: ({:?}, {:?})",
1230                    comment.object_id,
1231                    comment.sub_component,
1232                );
1233            }
1234        }
1235    }
1236
1237    #[instrument(level = "debug")]
1238    fn apply_source_references_update(
1239        &mut self,
1240        source_references: mz_catalog::durable::SourceReferences,
1241        diff: StateDiff,
1242        _retractions: &mut InProgressRetractions,
1243    ) {
1244        match diff {
1245            StateDiff::Addition => {
1246                let prev = self
1247                    .source_references
1248                    .insert(source_references.source_id, source_references.into());
1249                assert!(
1250                    prev.is_none(),
1251                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1252                );
1253            }
1254            StateDiff::Retraction => {
1255                let prev = self.source_references.remove(&source_references.source_id);
1256                assert!(
1257                    prev.is_some(),
1258                    "retraction for a non-existent existing value: {source_references:?}"
1259                );
1260            }
1261        }
1262    }
1263
1264    #[instrument(level = "debug")]
1265    fn apply_storage_collection_metadata_update(
1266        &mut self,
1267        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1268        diff: StateDiff,
1269        _retractions: &mut InProgressRetractions,
1270    ) {
1271        apply_inverted_lookup(
1272            &mut self.storage_metadata.collection_metadata,
1273            &storage_collection_metadata.id,
1274            storage_collection_metadata.shard,
1275            diff,
1276        );
1277    }
1278
1279    #[instrument(level = "debug")]
1280    fn apply_unfinalized_shard_update(
1281        &mut self,
1282        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1283        diff: StateDiff,
1284        _retractions: &mut InProgressRetractions,
1285    ) {
1286        match diff {
1287            StateDiff::Addition => {
1288                let newly_inserted = self
1289                    .storage_metadata
1290                    .unfinalized_shards
1291                    .insert(unfinalized_shard.shard);
1292                assert!(
1293                    newly_inserted,
1294                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1295                );
1296            }
1297            StateDiff::Retraction => {
1298                let removed = self
1299                    .storage_metadata
1300                    .unfinalized_shards
1301                    .remove(&unfinalized_shard.shard);
1302                assert!(
1303                    removed,
1304                    "retraction does not match existing value: {unfinalized_shard:?}"
1305                );
1306            }
1307        }
1308    }
1309
1310    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1311    /// durable catalog.
1312    #[instrument]
1313    pub(crate) fn generate_builtin_table_updates(
1314        &self,
1315        updates: Vec<StateUpdate>,
1316    ) -> Vec<BuiltinTableUpdate> {
1317        let mut builtin_table_updates = Vec::new();
1318        for StateUpdate { kind, ts: _, diff } in updates {
1319            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1320            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1321            builtin_table_updates.extend(builtin_table_update);
1322        }
1323        builtin_table_updates
1324    }
1325
1326    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1327    /// durable catalog.
1328    #[instrument(level = "debug")]
1329    pub(crate) fn generate_builtin_table_update(
1330        &self,
1331        kind: StateUpdateKind,
1332        diff: StateDiff,
1333    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1334        let diff = diff.into();
1335        match kind {
1336            StateUpdateKind::Role(role) => {
1337                let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1338                for group_id in role.membership.map.keys() {
1339                    builtin_table_updates
1340                        .push(self.pack_role_members_update(*group_id, role.id, diff))
1341                }
1342                builtin_table_updates
1343            }
1344            StateUpdateKind::RoleAuth(role_auth) => {
1345                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1346            }
1347            StateUpdateKind::Database(database) => {
1348                vec![self.pack_database_update(&database.id, diff)]
1349            }
1350            StateUpdateKind::Schema(schema) => {
1351                let db_spec = schema.database_id.into();
1352                vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1353            }
1354            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1355                vec![self.pack_default_privileges_update(
1356                    &default_privilege.object,
1357                    &default_privilege.acl_item.grantee,
1358                    &default_privilege.acl_item.acl_mode,
1359                    diff,
1360                )]
1361            }
1362            StateUpdateKind::SystemPrivilege(system_privilege) => {
1363                vec![self.pack_system_privileges_update(system_privilege, diff)]
1364            }
1365            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1366            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1367            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1368                self.pack_item_update(introspection_source_index.item_id, diff)
1369            }
1370            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1371                cluster_replica.cluster_id,
1372                &cluster_replica.name,
1373                diff,
1374            ),
1375            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1376                // Runtime-alterable system objects have real entries in the
1377                // items collection and so get handled through the normal
1378                // `StateUpdateKind::Item`.`
1379                if !system_object_mapping.unique_identifier.runtime_alterable() {
1380                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1381                } else {
1382                    vec![]
1383                }
1384            }
1385            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1386            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1387            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1388                comment.object_id,
1389                comment.sub_component,
1390                &comment.comment,
1391                diff,
1392            )],
1393            StateUpdateKind::SourceReferences(source_references) => {
1394                self.pack_source_references_update(&source_references, diff)
1395            }
1396            StateUpdateKind::AuditLog(audit_log) => {
1397                vec![
1398                    self.pack_audit_log_update(&audit_log.event, diff)
1399                        .expect("could not pack audit log update"),
1400                ]
1401            }
1402            StateUpdateKind::NetworkPolicy(policy) => self
1403                .pack_network_policy_update(&policy.id, diff)
1404                .expect("could not pack audit log update"),
1405            StateUpdateKind::StorageCollectionMetadata(_)
1406            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1407        }
1408    }
1409
1410    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1411        self.entry_by_id
1412            .get_mut(id)
1413            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1414    }
1415
1416    fn get_schema_mut(
1417        &mut self,
1418        database_spec: &ResolvedDatabaseSpecifier,
1419        schema_spec: &SchemaSpecifier,
1420        conn_id: &ConnectionId,
1421    ) -> &mut Schema {
1422        // Keep in sync with `get_schemas`
1423        match (database_spec, schema_spec) {
1424            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1425                .temporary_schemas
1426                .get_mut(conn_id)
1427                .expect("catalog out of sync"),
1428            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1429                .ambient_schemas_by_id
1430                .get_mut(id)
1431                .expect("catalog out of sync"),
1432            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1433                .database_by_id
1434                .get_mut(database_id)
1435                .expect("catalog out of sync")
1436                .schemas_by_id
1437                .get_mut(schema_id)
1438                .expect("catalog out of sync"),
1439            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1440                unreachable!("temporary schemas are in the ambient database")
1441            }
1442        }
1443    }
1444
1445    /// Install builtin views to the catalog. This is its own function so that views can be
1446    /// optimized in parallel.
1447    ///
1448    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1449    /// problems by sniffing out specific errors and then retrying once those dependencies are
1450    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1451    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1452    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1453    /// in awaiting with nothing retriggering the attempt.
1454    #[instrument(name = "catalog::parse_views")]
1455    async fn parse_builtin_views(
1456        state: &mut CatalogState,
1457        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1458        retractions: &mut InProgressRetractions,
1459        local_expression_cache: &mut LocalExpressionCache,
1460    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1461        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1462        let (updates, additions): (Vec<_>, Vec<_>) =
1463            builtin_views
1464                .into_iter()
1465                .partition_map(|(view, item_id, gid)| {
1466                    match retractions.system_object_mappings.remove(&item_id) {
1467                        Some(entry) => Either::Left(entry),
1468                        None => Either::Right((view, item_id, gid)),
1469                    }
1470                });
1471
1472        for entry in updates {
1473            // This implies that we updated the fingerprint for some builtin view. The retraction
1474            // was parsed, planned, and optimized using the compiled in definition, not the
1475            // definition from a previous version. So we can just stick the old entry back into the
1476            // catalog.
1477            let item_id = entry.id();
1478            state.insert_entry(entry);
1479            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1480        }
1481
1482        let mut handles = Vec::new();
1483        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1484            BTreeMap::new();
1485        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1486        // Some errors are due to the implementation of casts or SQL functions that depend on some
1487        // view. Instead of figuring out the exact view dependency, delay these until the end.
1488        let mut awaiting_all = Vec::new();
1489        // Completed views, needed to avoid race conditions.
1490        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1491        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1492
1493        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1494        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1495            .into_iter()
1496            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1497            .collect();
1498        let item_ids: Vec<_> = views.keys().copied().collect();
1499
1500        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1501        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1502            if handles.is_empty() && ready.is_empty() {
1503                // Enqueue the views that were waiting for all the others.
1504                ready.extend(awaiting_all.drain(..));
1505            }
1506
1507            // Spawn tasks for all ready views.
1508            if !ready.is_empty() {
1509                let spawn_state = Arc::new(state.clone());
1510                while let Some(id) = ready.pop_front() {
1511                    let (view, global_id) = views.get(&id).expect("must exist");
1512                    let global_id = *global_id;
1513                    let create_sql = view.create_sql();
1514                    // Views can't be versioned.
1515                    let versions = BTreeMap::new();
1516
1517                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1518                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1519                    let task_state = Arc::clone(&spawn_state);
1520                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1521                    let handle = mz_ore::task::spawn_blocking(
1522                        || "parse view",
1523                        move || {
1524                            span.in_scope(|| {
1525                                let res = task_state.parse_item_inner(
1526                                    global_id,
1527                                    &create_sql,
1528                                    &versions,
1529                                    None,
1530                                    false,
1531                                    None,
1532                                    cached_expr,
1533                                    None,
1534                                );
1535                                (id, global_id, res)
1536                            })
1537                        },
1538                    );
1539                    handles.push(handle);
1540                }
1541            }
1542
1543            // Wait for a view to be ready.
1544            let (selected, _idx, remaining) = future::select_all(handles).await;
1545            handles = remaining;
1546            let (id, global_id, res) = selected;
1547            let mut insert_cached_expr = |cached_expr| {
1548                if let Some(cached_expr) = cached_expr {
1549                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1550                }
1551            };
1552            match res {
1553                Ok((item, uncached_expr)) => {
1554                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1555                        local_expression_cache.insert_uncached_expression(
1556                            global_id,
1557                            uncached_expr,
1558                            optimizer_features,
1559                        );
1560                    }
1561                    // Add item to catalog.
1562                    let (view, _gid) = views.remove(&id).expect("must exist");
1563                    let schema_id = state
1564                        .ambient_schemas_by_name
1565                        .get(view.schema)
1566                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1567                    let qname = QualifiedItemName {
1568                        qualifiers: ItemQualifiers {
1569                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1570                            schema_spec: SchemaSpecifier::Id(*schema_id),
1571                        },
1572                        item: view.name.into(),
1573                    };
1574                    let mut acl_items = vec![rbac::owner_privilege(
1575                        mz_sql::catalog::ObjectType::View,
1576                        MZ_SYSTEM_ROLE_ID,
1577                    )];
1578                    acl_items.extend_from_slice(&view.access);
1579
1580                    state.insert_item(
1581                        id,
1582                        view.oid,
1583                        qname,
1584                        item,
1585                        MZ_SYSTEM_ROLE_ID,
1586                        PrivilegeMap::from_mz_acl_items(acl_items),
1587                    );
1588
1589                    // Enqueue any items waiting on this dependency.
1590                    let mut resolved_dependent_items = Vec::new();
1591                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1592                        resolved_dependent_items.extend(dependent_items);
1593                    }
1594                    let entry = state.get_entry(&id);
1595                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1596                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1597                        resolved_dependent_items.extend(dependent_items);
1598                    }
1599                    ready.extend(resolved_dependent_items);
1600
1601                    completed_ids.insert(id);
1602                    completed_names.insert(full_name);
1603                }
1604                // If we were missing a dependency, wait for it to be added.
1605                Err((
1606                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1607                    cached_expr,
1608                )) => {
1609                    insert_cached_expr(cached_expr);
1610                    if completed_ids.contains(&missing_dep) {
1611                        ready.push_back(id);
1612                    } else {
1613                        awaiting_id_dependencies
1614                            .entry(missing_dep)
1615                            .or_default()
1616                            .push(id);
1617                    }
1618                }
1619                // If we were missing a dependency, wait for it to be added.
1620                Err((
1621                    AdapterError::PlanError(plan::PlanError::Catalog(
1622                        SqlCatalogError::UnknownItem(missing_dep),
1623                    )),
1624                    cached_expr,
1625                )) => {
1626                    insert_cached_expr(cached_expr);
1627                    match CatalogItemId::from_str(&missing_dep) {
1628                        Ok(missing_dep) => {
1629                            if completed_ids.contains(&missing_dep) {
1630                                ready.push_back(id);
1631                            } else {
1632                                awaiting_id_dependencies
1633                                    .entry(missing_dep)
1634                                    .or_default()
1635                                    .push(id);
1636                            }
1637                        }
1638                        Err(_) => {
1639                            if completed_names.contains(&missing_dep) {
1640                                ready.push_back(id);
1641                            } else {
1642                                awaiting_name_dependencies
1643                                    .entry(missing_dep)
1644                                    .or_default()
1645                                    .push(id);
1646                            }
1647                        }
1648                    }
1649                }
1650                Err((
1651                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1652                    cached_expr,
1653                )) => {
1654                    insert_cached_expr(cached_expr);
1655                    awaiting_all.push(id);
1656                }
1657                Err((e, _)) => {
1658                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1659                    panic!(
1660                        "internal error: failed to load bootstrap view:\n\
1661                            {name}\n\
1662                            error:\n\
1663                            {e:?}\n\n\
1664                            Make sure that the schema name is specified in the builtin view's create sql statement.
1665                            ",
1666                        name = bad_view.name,
1667                    )
1668                }
1669            }
1670        }
1671
1672        assert!(awaiting_id_dependencies.is_empty());
1673        assert!(
1674            awaiting_name_dependencies.is_empty(),
1675            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1676        );
1677        assert!(awaiting_all.is_empty());
1678        assert!(views.is_empty());
1679
1680        // Generate a builtin table update for all the new views.
1681        builtin_table_updates.extend(
1682            item_ids
1683                .into_iter()
1684                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1685        );
1686
1687        builtin_table_updates
1688    }
1689
1690    /// Associates a name, `CatalogItemId`, and entry.
1691    fn insert_entry(&mut self, entry: CatalogEntry) {
1692        if !entry.id.is_system() {
1693            if let Some(cluster_id) = entry.item.cluster_id() {
1694                self.clusters_by_id
1695                    .get_mut(&cluster_id)
1696                    .expect("catalog out of sync")
1697                    .bound_objects
1698                    .insert(entry.id);
1699            };
1700        }
1701
1702        for u in entry.references().items() {
1703            match self.entry_by_id.get_mut(u) {
1704                Some(metadata) => metadata.referenced_by.push(entry.id()),
1705                None => panic!(
1706                    "Catalog: missing dependent catalog item {} while installing {}",
1707                    &u,
1708                    self.resolve_full_name(entry.name(), entry.conn_id())
1709                ),
1710            }
1711        }
1712        for u in entry.uses() {
1713            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1714            // present.
1715            if u == entry.id() {
1716                continue;
1717            }
1718            match self.entry_by_id.get_mut(&u) {
1719                Some(metadata) => metadata.used_by.push(entry.id()),
1720                None => panic!(
1721                    "Catalog: missing dependent catalog item {} while installing {}",
1722                    &u,
1723                    self.resolve_full_name(entry.name(), entry.conn_id())
1724                ),
1725            }
1726        }
1727        for gid in entry.item.global_ids() {
1728            self.entry_by_global_id.insert(gid, entry.id());
1729        }
1730        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1731        // Lazily create the temporary schema if this is a temporary item and the schema
1732        // doesn't exist yet.
1733        if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1734            && !self.temporary_schemas.contains_key(conn_id)
1735        {
1736            self.create_temporary_schema(conn_id, entry.owner_id)
1737                .expect("failed to create temporary schema");
1738        }
1739        let schema = self.get_schema_mut(
1740            &entry.name().qualifiers.database_spec,
1741            &entry.name().qualifiers.schema_spec,
1742            conn_id,
1743        );
1744
1745        let prev_id = match entry.item() {
1746            CatalogItem::Func(_) => schema
1747                .functions
1748                .insert(entry.name().item.clone(), entry.id()),
1749            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1750            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1751        };
1752
1753        assert!(
1754            prev_id.is_none(),
1755            "builtin name collision on {:?}",
1756            entry.name().item.clone()
1757        );
1758
1759        self.entry_by_id.insert(entry.id(), entry.clone());
1760    }
1761
1762    /// Associates a name, [`CatalogItemId`], and entry.
1763    fn insert_item(
1764        &mut self,
1765        id: CatalogItemId,
1766        oid: u32,
1767        name: QualifiedItemName,
1768        item: CatalogItem,
1769        owner_id: RoleId,
1770        privileges: PrivilegeMap,
1771    ) {
1772        let entry = CatalogEntry {
1773            item,
1774            name,
1775            id,
1776            oid,
1777            used_by: Vec::new(),
1778            referenced_by: Vec::new(),
1779            owner_id,
1780            privileges,
1781        };
1782
1783        self.insert_entry(entry);
1784    }
1785
1786    #[mz_ore::instrument(level = "trace")]
1787    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1788        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1789        for u in metadata.references().items() {
1790            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1791                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1792            }
1793        }
1794        for u in metadata.uses() {
1795            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1796                dep_metadata.used_by.retain(|u| *u != metadata.id())
1797            }
1798        }
1799        for gid in metadata.global_ids() {
1800            self.entry_by_global_id.remove(&gid);
1801        }
1802
1803        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1804        let schema = self.get_schema_mut(
1805            &metadata.name().qualifiers.database_spec,
1806            &metadata.name().qualifiers.schema_spec,
1807            conn_id,
1808        );
1809        if metadata.item_type() == CatalogItemType::Type {
1810            schema
1811                .types
1812                .remove(&metadata.name().item)
1813                .expect("catalog out of sync");
1814        } else {
1815            // Functions would need special handling, but we don't yet support
1816            // dropping functions.
1817            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1818
1819            schema
1820                .items
1821                .remove(&metadata.name().item)
1822                .expect("catalog out of sync");
1823        };
1824
1825        if !id.is_system() {
1826            if let Some(cluster_id) = metadata.item().cluster_id() {
1827                assert!(
1828                    self.clusters_by_id
1829                        .get_mut(&cluster_id)
1830                        .expect("catalog out of sync")
1831                        .bound_objects
1832                        .remove(&id),
1833                    "catalog out of sync"
1834                );
1835            }
1836        }
1837
1838        metadata
1839    }
1840
1841    fn insert_introspection_source_index(
1842        &mut self,
1843        cluster_id: ClusterId,
1844        log: &'static BuiltinLog,
1845        item_id: CatalogItemId,
1846        global_id: GlobalId,
1847        oid: u32,
1848    ) {
1849        let (index_name, index) =
1850            self.create_introspection_source_index(cluster_id, log, global_id);
1851        self.insert_item(
1852            item_id,
1853            oid,
1854            index_name,
1855            index,
1856            MZ_SYSTEM_ROLE_ID,
1857            PrivilegeMap::default(),
1858        );
1859    }
1860
1861    fn create_introspection_source_index(
1862        &self,
1863        cluster_id: ClusterId,
1864        log: &'static BuiltinLog,
1865        global_id: GlobalId,
1866    ) -> (QualifiedItemName, CatalogItem) {
1867        let source_name = FullItemName {
1868            database: RawDatabaseSpecifier::Ambient,
1869            schema: log.schema.into(),
1870            item: log.name.into(),
1871        };
1872        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1873        let mut index_name = QualifiedItemName {
1874            qualifiers: ItemQualifiers {
1875                database_spec: ResolvedDatabaseSpecifier::Ambient,
1876                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1877            },
1878            item: index_name.clone(),
1879        };
1880        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1881        let index_item_name = index_name.item.clone();
1882        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1883        let index = CatalogItem::Index(Index {
1884            global_id,
1885            on: log_global_id,
1886            keys: log
1887                .variant
1888                .index_by()
1889                .into_iter()
1890                .map(MirScalarExpr::column)
1891                .collect(),
1892            create_sql: index_sql(
1893                index_item_name,
1894                cluster_id,
1895                source_name,
1896                &log.variant.desc(),
1897                &log.variant.index_by(),
1898            ),
1899            conn_id: None,
1900            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1901            cluster_id,
1902            is_retained_metrics_object: false,
1903            custom_logical_compaction_window: None,
1904        });
1905        (index_name, index)
1906    }
1907
1908    /// Insert system configuration `name` with `value`.
1909    ///
1910    /// Return a `bool` value indicating whether the configuration was modified
1911    /// by the call.
1912    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1913        Ok(self.system_configuration.set(name, value)?)
1914    }
1915
1916    /// Reset system configuration `name`.
1917    ///
1918    /// Return a `bool` value indicating whether the configuration was modified
1919    /// by the call.
1920    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1921        Ok(self.system_configuration.reset(name)?)
1922    }
1923}
1924
1925/// Sort [`StateUpdate`]s in dependency order.
1926///
1927/// # Panics
1928///
1929/// This function assumes that all provided `updates` have the same timestamp
1930/// and will panic otherwise.
1931fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1932    fn push_update<T>(
1933        update: T,
1934        diff: StateDiff,
1935        retractions: &mut Vec<T>,
1936        additions: &mut Vec<T>,
1937    ) {
1938        match diff {
1939            StateDiff::Retraction => retractions.push(update),
1940            StateDiff::Addition => additions.push(update),
1941        }
1942    }
1943
1944    soft_assert_no_log!(
1945        updates.iter().map(|update| update.ts).all_equal(),
1946        "all timestamps should be equal: {updates:?}"
1947    );
1948
1949    // Partition updates by type so that we can weave different update types into the right spots.
1950    let mut pre_cluster_retractions = Vec::new();
1951    let mut pre_cluster_additions = Vec::new();
1952    let mut cluster_retractions = Vec::new();
1953    let mut cluster_additions = Vec::new();
1954    let mut builtin_item_updates = Vec::new();
1955    let mut item_retractions = Vec::new();
1956    let mut item_additions = Vec::new();
1957    let mut temp_item_retractions = Vec::new();
1958    let mut temp_item_additions = Vec::new();
1959    let mut post_item_retractions = Vec::new();
1960    let mut post_item_additions = Vec::new();
1961    for update in updates {
1962        let diff = update.diff.clone();
1963        match update.kind {
1964            StateUpdateKind::Role(_)
1965            | StateUpdateKind::RoleAuth(_)
1966            | StateUpdateKind::Database(_)
1967            | StateUpdateKind::Schema(_)
1968            | StateUpdateKind::DefaultPrivilege(_)
1969            | StateUpdateKind::SystemPrivilege(_)
1970            | StateUpdateKind::SystemConfiguration(_)
1971            | StateUpdateKind::NetworkPolicy(_) => push_update(
1972                update,
1973                diff,
1974                &mut pre_cluster_retractions,
1975                &mut pre_cluster_additions,
1976            ),
1977            StateUpdateKind::Cluster(_)
1978            | StateUpdateKind::IntrospectionSourceIndex(_)
1979            | StateUpdateKind::ClusterReplica(_) => push_update(
1980                update,
1981                diff,
1982                &mut cluster_retractions,
1983                &mut cluster_additions,
1984            ),
1985            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1986                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1987            }
1988            StateUpdateKind::TemporaryItem(item) => push_update(
1989                (item, update.ts, update.diff),
1990                diff,
1991                &mut temp_item_retractions,
1992                &mut temp_item_additions,
1993            ),
1994            StateUpdateKind::Item(item) => push_update(
1995                (item, update.ts, update.diff),
1996                diff,
1997                &mut item_retractions,
1998                &mut item_additions,
1999            ),
2000            StateUpdateKind::Comment(_)
2001            | StateUpdateKind::SourceReferences(_)
2002            | StateUpdateKind::AuditLog(_)
2003            | StateUpdateKind::StorageCollectionMetadata(_)
2004            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2005                update,
2006                diff,
2007                &mut post_item_retractions,
2008                &mut post_item_additions,
2009            ),
2010        }
2011    }
2012
2013    // Sort builtin item updates by dependency.
2014    let builtin_item_updates = builtin_item_updates
2015        .into_iter()
2016        .map(|(system_object_mapping, ts, diff)| {
2017            let idx = BUILTIN_LOOKUP
2018                .get(&system_object_mapping.description)
2019                .expect("missing builtin")
2020                .0;
2021            (idx, system_object_mapping, ts, diff)
2022        })
2023        .sorted_by_key(|(idx, _, _, _)| *idx)
2024        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2025
2026    // Further partition builtin item updates.
2027    let mut other_builtin_retractions = Vec::new();
2028    let mut other_builtin_additions = Vec::new();
2029    let mut builtin_index_retractions = Vec::new();
2030    let mut builtin_index_additions = Vec::new();
2031    for (builtin_item_update, ts, diff) in builtin_item_updates {
2032        match &builtin_item_update.description.object_type {
2033            CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
2034                StateUpdate {
2035                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2036                    ts,
2037                    diff,
2038                },
2039                diff,
2040                &mut builtin_index_retractions,
2041                &mut builtin_index_additions,
2042            ),
2043            CatalogItemType::Table
2044            | CatalogItemType::Source
2045            | CatalogItemType::Sink
2046            | CatalogItemType::View
2047            | CatalogItemType::MaterializedView
2048            | CatalogItemType::Type
2049            | CatalogItemType::Func
2050            | CatalogItemType::Secret
2051            | CatalogItemType::Connection => push_update(
2052                StateUpdate {
2053                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2054                    ts,
2055                    diff,
2056                },
2057                diff,
2058                &mut other_builtin_retractions,
2059                &mut other_builtin_additions,
2060            ),
2061        }
2062    }
2063
2064    /// Sort `CONNECTION` items.
2065    ///
2066    /// `CONNECTION`s can depend on one another, e.g. a `KAFKA CONNECTION` contains
2067    /// a list of `BROKERS` and these broker definitions can themselves reference other
2068    /// connections. This `BROKERS` list can also be `ALTER`ed and thus it's possible
2069    /// for a connection to depend on another with an ID greater than its own.
2070    fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2071        let mut topo: BTreeMap<
2072            (mz_catalog::durable::Item, Timestamp, StateDiff),
2073            BTreeSet<CatalogItemId>,
2074        > = BTreeMap::default();
2075        let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
2076
2077        // Initialize our set of topological sort.
2078        tracing::debug!(?connections, "sorting connections");
2079        for (connection, ts, diff) in connections.drain(..) {
2080            let statement = mz_sql::parse::parse(&connection.create_sql)
2081                .expect("valid CONNECTION create_sql")
2082                .into_element()
2083                .ast;
2084            let mut dependencies = mz_sql::names::dependencies(&statement)
2085                .expect("failed to find dependencies of CONNECTION");
2086            // Be defensive and remove any possible self references.
2087            dependencies.remove(&connection.id);
2088            // It's possible we're applying updates to a connection where the
2089            // dependency already exists and thus it's not in `connections`.
2090            dependencies.retain(|dep| existing_connections.contains(dep));
2091
2092            // Be defensive and ensure we're not clobbering any items.
2093            assert_none!(topo.insert((connection, ts, diff), dependencies));
2094        }
2095        tracing::debug!(?topo, ?existing_connections, "built topological sort");
2096
2097        // Do a topological sort, pushing back into the provided Vec.
2098        while !topo.is_empty() {
2099            // Get all of the connections with no dependencies.
2100            let no_deps: Vec<_> = topo
2101                .iter()
2102                .filter_map(|(item, deps)| {
2103                    if deps.is_empty() {
2104                        Some(item.clone())
2105                    } else {
2106                        None
2107                    }
2108                })
2109                .collect();
2110
2111            // Cycle in our graph!
2112            if no_deps.is_empty() {
2113                panic!("programming error, cycle in Connections");
2114            }
2115
2116            // Process all of the items with no dependencies.
2117            for item in no_deps {
2118                // Remove the item from our topological sort.
2119                topo.remove(&item);
2120                // Remove this item from anything that depends on it.
2121                topo.values_mut().for_each(|deps| {
2122                    deps.remove(&item.0.id);
2123                });
2124                // Push it back into our list as "completed".
2125                connections.push(item);
2126            }
2127        }
2128    }
2129
2130    /// Sort item updates by dependency.
2131    ///
2132    /// First we group items into groups that are totally ordered by dependency. For example, when
2133    /// sorting all items by dependency we know that all tables can come after all sources, because
2134    /// a source can never depend on a table. Within these groups, the ID order matches the
2135    /// dependency order.
2136    ///
2137    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2138    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2139    /// approach would be to investigate each item, discover their exact dependencies, and then
2140    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2141    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2142    /// referred to by name.
2143    ///
2144    /// The logic of this function should match [`sort_temp_item_updates`].
2145    fn sort_item_updates(
2146        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2147    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2148        // Partition items into groups s.t. each item in one group has a predefined order with all
2149        // items in other groups. For example, all sinks are ordered greater than all tables.
2150        let mut types = Vec::new();
2151        // N.B. Functions can depend on system tables, but not user tables.
2152        // TODO(udf): This will change when UDFs are supported.
2153        let mut funcs = Vec::new();
2154        let mut secrets = Vec::new();
2155        let mut connections = Vec::new();
2156        let mut sources = Vec::new();
2157        let mut tables = Vec::new();
2158        let mut derived_items = Vec::new();
2159        let mut sinks = Vec::new();
2160        let mut continual_tasks = Vec::new();
2161
2162        for update in item_updates {
2163            match update.0.item_type() {
2164                CatalogItemType::Type => types.push(update),
2165                CatalogItemType::Func => funcs.push(update),
2166                CatalogItemType::Secret => secrets.push(update),
2167                CatalogItemType::Connection => connections.push(update),
2168                CatalogItemType::Source => sources.push(update),
2169                CatalogItemType::Table => tables.push(update),
2170                CatalogItemType::View
2171                | CatalogItemType::MaterializedView
2172                | CatalogItemType::Index => derived_items.push(update),
2173                CatalogItemType::Sink => sinks.push(update),
2174                CatalogItemType::ContinualTask => continual_tasks.push(update),
2175            }
2176        }
2177
2178        // Within each group, sort by ID.
2179        for group in [
2180            &mut types,
2181            &mut funcs,
2182            &mut secrets,
2183            &mut sources,
2184            &mut tables,
2185            &mut derived_items,
2186            &mut sinks,
2187            &mut continual_tasks,
2188        ] {
2189            group.sort_by_key(|(item, _, _)| item.id);
2190        }
2191
2192        // HACK(parkmycar): Connections are special and can depend on one another. Additionally
2193        // connections can be `ALTER`ed and thus a `CONNECTION` can depend on another whose ID
2194        // is greater than its own.
2195        sort_connections(&mut connections);
2196
2197        iter::empty()
2198            .chain(types)
2199            .chain(funcs)
2200            .chain(secrets)
2201            .chain(connections)
2202            .chain(sources)
2203            .chain(tables)
2204            .chain(derived_items)
2205            .chain(sinks)
2206            .chain(continual_tasks)
2207            .collect()
2208    }
2209
2210    let item_retractions = sort_item_updates(item_retractions);
2211    let item_additions = sort_item_updates(item_additions);
2212
2213    /// Sort temporary item updates by dependency.
2214    ///
2215    /// The logic of this function should match [`sort_item_updates`].
2216    fn sort_temp_item_updates(
2217        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2218    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2219        // Partition items into groups s.t. each item in one group has a predefined order with all
2220        // items in other groups. For example, all sinks are ordered greater than all tables.
2221        let mut types = Vec::new();
2222        // N.B. Functions can depend on system tables, but not user tables.
2223        let mut funcs = Vec::new();
2224        let mut secrets = Vec::new();
2225        let mut connections = Vec::new();
2226        let mut sources = Vec::new();
2227        let mut tables = Vec::new();
2228        let mut derived_items = Vec::new();
2229        let mut sinks = Vec::new();
2230        let mut continual_tasks = Vec::new();
2231
2232        for update in temp_item_updates {
2233            match update.0.item_type() {
2234                CatalogItemType::Type => types.push(update),
2235                CatalogItemType::Func => funcs.push(update),
2236                CatalogItemType::Secret => secrets.push(update),
2237                CatalogItemType::Connection => connections.push(update),
2238                CatalogItemType::Source => sources.push(update),
2239                CatalogItemType::Table => tables.push(update),
2240                CatalogItemType::View
2241                | CatalogItemType::MaterializedView
2242                | CatalogItemType::Index => derived_items.push(update),
2243                CatalogItemType::Sink => sinks.push(update),
2244                CatalogItemType::ContinualTask => continual_tasks.push(update),
2245            }
2246        }
2247
2248        // Within each group, sort by ID.
2249        for group in [
2250            &mut types,
2251            &mut funcs,
2252            &mut secrets,
2253            &mut connections,
2254            &mut sources,
2255            &mut tables,
2256            &mut derived_items,
2257            &mut sinks,
2258            &mut continual_tasks,
2259        ] {
2260            group.sort_by_key(|(item, _, _)| item.id);
2261        }
2262
2263        iter::empty()
2264            .chain(types)
2265            .chain(funcs)
2266            .chain(secrets)
2267            .chain(connections)
2268            .chain(sources)
2269            .chain(tables)
2270            .chain(derived_items)
2271            .chain(sinks)
2272            .chain(continual_tasks)
2273            .collect()
2274    }
2275    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2276    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2277
2278    /// Merge sorted temporary and non-temp items.
2279    fn merge_item_updates(
2280        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2281        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2282    ) -> Vec<StateUpdate> {
2283        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2284
2285        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2286            (item_updates.front(), temp_item_updates.front())
2287        {
2288            if item.id < temp_item.id {
2289                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2290                state_updates.push(StateUpdate {
2291                    kind: StateUpdateKind::Item(item),
2292                    ts,
2293                    diff,
2294                });
2295            } else if item.id > temp_item.id {
2296                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2297                state_updates.push(StateUpdate {
2298                    kind: StateUpdateKind::TemporaryItem(temp_item),
2299                    ts,
2300                    diff,
2301                });
2302            } else {
2303                unreachable!(
2304                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2305                );
2306            }
2307        }
2308
2309        while let Some((item, ts, diff)) = item_updates.pop_front() {
2310            state_updates.push(StateUpdate {
2311                kind: StateUpdateKind::Item(item),
2312                ts,
2313                diff,
2314            });
2315        }
2316
2317        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2318            state_updates.push(StateUpdate {
2319                kind: StateUpdateKind::TemporaryItem(temp_item),
2320                ts,
2321                diff,
2322            });
2323        }
2324
2325        state_updates
2326    }
2327    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2328    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2329
2330    // Put everything back together.
2331    iter::empty()
2332        // All retractions must be reversed.
2333        .chain(post_item_retractions.into_iter().rev())
2334        .chain(item_retractions.into_iter().rev())
2335        .chain(builtin_index_retractions.into_iter().rev())
2336        .chain(cluster_retractions.into_iter().rev())
2337        .chain(other_builtin_retractions.into_iter().rev())
2338        .chain(pre_cluster_retractions.into_iter().rev())
2339        .chain(pre_cluster_additions)
2340        .chain(other_builtin_additions)
2341        .chain(cluster_additions)
2342        .chain(builtin_index_additions)
2343        .chain(item_additions)
2344        .chain(post_item_additions)
2345        .collect()
2346}
2347
2348/// Groups of updates of certain types are applied in batches to improve
2349/// performance. A constraint is that updates must be applied in order. This
2350/// process is modeled as a state machine that batches then applies groups of
2351/// updates.
2352enum ApplyState {
2353    /// Additions of builtin views.
2354    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2355    /// Item updates that aren't builtin view additions.
2356    ///
2357    /// This contains all updates whose application requires calling
2358    /// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2359    /// flags.
2360    Items(Vec<StateUpdate>),
2361    /// All other updates.
2362    Updates(Vec<StateUpdate>),
2363}
2364
2365impl ApplyState {
2366    fn new(update: StateUpdate) -> Self {
2367        use StateUpdateKind::*;
2368        match &update.kind {
2369            SystemObjectMapping(som)
2370                if som.description.object_type == CatalogItemType::View
2371                    && update.diff == StateDiff::Addition =>
2372            {
2373                let view_addition = lookup_builtin_view_addition(som.clone());
2374                Self::BuiltinViewAdditions(vec![view_addition])
2375            }
2376
2377            IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2378                Self::Items(vec![update])
2379            }
2380
2381            Role(_)
2382            | RoleAuth(_)
2383            | Database(_)
2384            | Schema(_)
2385            | DefaultPrivilege(_)
2386            | SystemPrivilege(_)
2387            | SystemConfiguration(_)
2388            | Cluster(_)
2389            | NetworkPolicy(_)
2390            | ClusterReplica(_)
2391            | SourceReferences(_)
2392            | Comment(_)
2393            | AuditLog(_)
2394            | StorageCollectionMetadata(_)
2395            | UnfinalizedShard(_) => Self::Updates(vec![update]),
2396        }
2397    }
2398
2399    /// Apply all updates that have been batched in `self`.
2400    ///
2401    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2402    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2403    /// details.
2404    async fn apply(
2405        self,
2406        state: &mut CatalogState,
2407        retractions: &mut InProgressRetractions,
2408        local_expression_cache: &mut LocalExpressionCache,
2409    ) -> (
2410        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2411        Vec<ParsedStateUpdate>,
2412    ) {
2413        match self {
2414            Self::BuiltinViewAdditions(builtin_view_additions) => {
2415                let restore = state.system_configuration.clone();
2416                state.system_configuration.enable_for_item_parsing();
2417                let builtin_table_updates = CatalogState::parse_builtin_views(
2418                    state,
2419                    builtin_view_additions,
2420                    retractions,
2421                    local_expression_cache,
2422                )
2423                .await;
2424                state.system_configuration = restore;
2425                (builtin_table_updates, Vec::new())
2426            }
2427            Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2428                state
2429                    .apply_updates_inner(updates, retractions, local_expression_cache)
2430                    .expect("corrupt catalog")
2431            }),
2432            Self::Updates(updates) => state
2433                .apply_updates_inner(updates, retractions, local_expression_cache)
2434                .expect("corrupt catalog"),
2435        }
2436    }
2437
2438    async fn step(
2439        self,
2440        next: Self,
2441        state: &mut CatalogState,
2442        retractions: &mut InProgressRetractions,
2443        local_expression_cache: &mut LocalExpressionCache,
2444    ) -> (
2445        Self,
2446        (
2447            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2448            Vec<ParsedStateUpdate>,
2449        ),
2450    ) {
2451        match (self, next) {
2452            (
2453                Self::BuiltinViewAdditions(mut builtin_view_additions),
2454                Self::BuiltinViewAdditions(next_builtin_view_additions),
2455            ) => {
2456                // Continue batching builtin view additions.
2457                builtin_view_additions.extend(next_builtin_view_additions);
2458                (
2459                    Self::BuiltinViewAdditions(builtin_view_additions),
2460                    (Vec::new(), Vec::new()),
2461                )
2462            }
2463            (Self::Items(mut updates), Self::Items(next_updates)) => {
2464                // Continue batching item updates.
2465                updates.extend(next_updates);
2466                (Self::Items(updates), (Vec::new(), Vec::new()))
2467            }
2468            (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2469                // Continue batching updates.
2470                updates.extend(next_updates);
2471                (Self::Updates(updates), (Vec::new(), Vec::new()))
2472            }
2473            (apply_state, next_apply_state) => {
2474                // Apply the current batch and start batching new apply state.
2475                let updates = apply_state
2476                    .apply(state, retractions, local_expression_cache)
2477                    .await;
2478                (next_apply_state, updates)
2479            }
2480        }
2481    }
2482}
2483
2484/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2485/// generally IDs.
2486///
2487/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2488fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2489where
2490    K: Ord + Clone + Debug,
2491    V: PartialEq + Debug,
2492{
2493    match diff {
2494        StateDiff::Retraction => {
2495            let prev = map.remove(key);
2496            assert_eq!(
2497                prev,
2498                Some(value),
2499                "retraction does not match existing value: {key:?}"
2500            );
2501        }
2502        StateDiff::Addition => {
2503            let prev = map.insert(key.clone(), value);
2504            assert_eq!(
2505                prev, None,
2506                "values must be explicitly retracted before inserting a new value: {key:?}"
2507            );
2508        }
2509    }
2510}
2511
2512/// Helper method to update catalog state, that may need to be updated from a previously retracted
2513/// object.
2514fn apply_with_update<K, V, D>(
2515    map: &mut BTreeMap<K, V>,
2516    durable: D,
2517    key_fn: impl FnOnce(&D) -> K,
2518    diff: StateDiff,
2519    retractions: &mut BTreeMap<D::Key, V>,
2520) where
2521    K: Ord,
2522    V: UpdateFrom<D> + PartialEq + Debug,
2523    D: DurableType,
2524    D::Key: Ord,
2525{
2526    match diff {
2527        StateDiff::Retraction => {
2528            let mem_key = key_fn(&durable);
2529            let value = map
2530                .remove(&mem_key)
2531                .expect("retraction does not match existing value: {key:?}");
2532            let durable_key = durable.into_key_value().0;
2533            retractions.insert(durable_key, value);
2534        }
2535        StateDiff::Addition => {
2536            let mem_key = key_fn(&durable);
2537            let durable_key = durable.key();
2538            let value = match retractions.remove(&durable_key) {
2539                Some(mut retraction) => {
2540                    retraction.update_from(durable);
2541                    retraction
2542                }
2543                None => durable.into(),
2544            };
2545            let prev = map.insert(mem_key, value);
2546            assert_eq!(
2547                prev, None,
2548                "values must be explicitly retracted before inserting a new value"
2549            );
2550        }
2551    }
2552}
2553
2554/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2555fn lookup_builtin_view_addition(
2556    mapping: SystemObjectMapping,
2557) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2558    let (_, builtin) = BUILTIN_LOOKUP
2559        .get(&mapping.description)
2560        .expect("missing builtin view");
2561    let Builtin::View(view) = builtin else {
2562        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2563    };
2564
2565    (
2566        view,
2567        mapping.unique_identifier.catalog_id,
2568        mapping.unique_identifier.global_id,
2569    )
2570}