Skip to main content

mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
35    NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{assert_none, instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::index_sql;
67
68/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
69/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
70/// results in applying a retraction for that object followed by applying an addition for that
71/// object. When applying those additions it can be extremely expensive to re-build that
72/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
73/// retractions, so it can be used during additions.
74///
75/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
76/// objects that maintain denormalized state.
77// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
78// the update step is a no-op for certain types.
79#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81    roles: BTreeMap<RoleKey, Role>,
82    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83    databases: BTreeMap<DatabaseKey, Database>,
84    schemas: BTreeMap<SchemaKey, Schema>,
85    clusters: BTreeMap<ClusterKey, Cluster>,
86    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87    items: BTreeMap<ItemKey, CatalogEntry>,
88    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
95    ///
96    /// Returns builtin table updates corresponding to the changes to catalog state.
97    #[must_use]
98    #[instrument]
99    pub(crate) async fn apply_updates(
100        &mut self,
101        updates: Vec<StateUpdate>,
102        local_expression_cache: &mut LocalExpressionCache,
103    ) -> (
104        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
105        Vec<ParsedStateUpdate>,
106    ) {
107        let mut builtin_table_updates = Vec::with_capacity(updates.len());
108        let mut catalog_updates = Vec::with_capacity(updates.len());
109
110        // First, consolidate updates. The code that applies parsed state
111        // updates _requires_ that the given updates are consolidated. There
112        // must be at most one addition and/or one retraction for a given item,
113        // as identified by that items ID type.
114        let updates = Self::consolidate_updates(updates);
115
116        // Apply updates in groups, according to their timestamps.
117        let mut groups: Vec<Vec<_>> = Vec::new();
118        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
119            // Bring the updates into the pseudo-topological order that we need
120            // for updating our in-memory state and generating builtin table
121            // updates.
122            let updates = sort_updates(updates.collect());
123            groups.push(updates);
124        }
125
126        for updates in groups {
127            let mut apply_state = ApplyState::Updates(Vec::new());
128            let mut retractions = InProgressRetractions::default();
129
130            for update in updates {
131                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
132                    .step(
133                        ApplyState::new(update),
134                        self,
135                        &mut retractions,
136                        local_expression_cache,
137                    )
138                    .await;
139                apply_state = next_apply_state;
140                builtin_table_updates.extend(builtin_table_update);
141                catalog_updates.extend(catalog_update);
142            }
143
144            // Apply remaining state.
145            let (builtin_table_update, catalog_update) = apply_state
146                .apply(self, &mut retractions, local_expression_cache)
147                .await;
148            builtin_table_updates.extend(builtin_table_update);
149            catalog_updates.extend(catalog_update);
150        }
151
152        (builtin_table_updates, catalog_updates)
153    }
154
155    /// It can happen that the sequencing logic creates "fluctuating" updates
156    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
157    /// for a table, there will be a retraction of the original table state,
158    /// then an addition for the same table but stripped of some of the roles
159    /// and access things, and then a retraction for that intermediate table
160    /// state. By consolidating, the intermediate state addition/retraction will
161    /// cancel out and we'll only see the retraction for the original state.
162    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
163        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
164            .into_iter()
165            .map(|update| (update.kind, update.ts, update.diff.into()))
166            .collect_vec();
167
168        consolidate_updates(&mut updates);
169
170        updates
171            .into_iter()
172            .map(|(kind, ts, diff)| StateUpdate {
173                kind,
174                ts,
175                diff: diff
176                    .try_into()
177                    .expect("catalog state cannot have diff other than -1 or 1"),
178            })
179            .collect_vec()
180    }
181
182    #[instrument(level = "debug")]
183    fn apply_updates_inner(
184        &mut self,
185        updates: Vec<StateUpdate>,
186        retractions: &mut InProgressRetractions,
187        local_expression_cache: &mut LocalExpressionCache,
188    ) -> Result<
189        (
190            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
191            Vec<ParsedStateUpdate>,
192        ),
193        CatalogError,
194    > {
195        soft_assert_no_log!(
196            updates.iter().map(|update| update.ts).all_equal(),
197            "all timestamps should be equal: {updates:?}"
198        );
199
200        let mut update_system_config = false;
201
202        let mut builtin_table_updates = Vec::with_capacity(updates.len());
203        let mut catalog_updates = Vec::new();
204
205        for state_update in updates {
206            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
207                update_system_config = true;
208            }
209
210            match state_update.diff {
211                StateDiff::Retraction => {
212                    // We want the parsed catalog updates to match the state of
213                    // the catalog _before_ applying a retraction. So that we can
214                    // still have useful in-memory state to work with.
215                    if let Some(update) =
216                        parsed_state_updates::parse_state_update(self, state_update.clone())
217                    {
218                        catalog_updates.push(update);
219                    }
220
221                    // We want the builtin table retraction to match the state of the catalog
222                    // before applying the update.
223                    builtin_table_updates.extend(self.generate_builtin_table_update(
224                        state_update.kind.clone(),
225                        state_update.diff,
226                    ));
227                    self.apply_update(
228                        state_update.kind,
229                        state_update.diff,
230                        retractions,
231                        local_expression_cache,
232                    )?;
233                }
234                StateDiff::Addition => {
235                    self.apply_update(
236                        state_update.kind.clone(),
237                        state_update.diff,
238                        retractions,
239                        local_expression_cache,
240                    )?;
241                    // We want the builtin table addition to match the state of
242                    // the catalog after applying the update. So that we already
243                    // have useful in-memory state to work with.
244                    builtin_table_updates.extend(self.generate_builtin_table_update(
245                        state_update.kind.clone(),
246                        state_update.diff,
247                    ));
248
249                    // We want the parsed catalog updates to match the state of
250                    // the catalog _after_ applying an addition.
251                    if let Some(update) =
252                        parsed_state_updates::parse_state_update(self, state_update.clone())
253                    {
254                        catalog_updates.push(update);
255                    }
256                }
257            }
258        }
259
260        if update_system_config {
261            self.system_configuration.dyncfg_updates();
262        }
263
264        Ok((builtin_table_updates, catalog_updates))
265    }
266
267    #[instrument(level = "debug")]
268    fn apply_update(
269        &mut self,
270        kind: StateUpdateKind,
271        diff: StateDiff,
272        retractions: &mut InProgressRetractions,
273        local_expression_cache: &mut LocalExpressionCache,
274    ) -> Result<(), CatalogError> {
275        match kind {
276            StateUpdateKind::Role(role) => {
277                self.apply_role_update(role, diff, retractions);
278            }
279            StateUpdateKind::RoleAuth(role_auth) => {
280                self.apply_role_auth_update(role_auth, diff, retractions);
281            }
282            StateUpdateKind::Database(database) => {
283                self.apply_database_update(database, diff, retractions);
284            }
285            StateUpdateKind::Schema(schema) => {
286                self.apply_schema_update(schema, diff, retractions);
287            }
288            StateUpdateKind::DefaultPrivilege(default_privilege) => {
289                self.apply_default_privilege_update(default_privilege, diff, retractions);
290            }
291            StateUpdateKind::SystemPrivilege(system_privilege) => {
292                self.apply_system_privilege_update(system_privilege, diff, retractions);
293            }
294            StateUpdateKind::SystemConfiguration(system_configuration) => {
295                self.apply_system_configuration_update(system_configuration, diff, retractions);
296            }
297            StateUpdateKind::Cluster(cluster) => {
298                self.apply_cluster_update(cluster, diff, retractions);
299            }
300            StateUpdateKind::NetworkPolicy(network_policy) => {
301                self.apply_network_policy_update(network_policy, diff, retractions);
302            }
303            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304                self.apply_introspection_source_index_update(
305                    introspection_source_index,
306                    diff,
307                    retractions,
308                );
309            }
310            StateUpdateKind::ClusterReplica(cluster_replica) => {
311                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
312            }
313            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
314                self.apply_system_object_mapping_update(
315                    system_object_mapping,
316                    diff,
317                    retractions,
318                    local_expression_cache,
319                );
320            }
321            StateUpdateKind::TemporaryItem(item) => {
322                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
323            }
324            StateUpdateKind::Item(item) => {
325                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
326            }
327            StateUpdateKind::Comment(comment) => {
328                self.apply_comment_update(comment, diff, retractions);
329            }
330            StateUpdateKind::SourceReferences(source_reference) => {
331                self.apply_source_references_update(source_reference, diff, retractions);
332            }
333            StateUpdateKind::AuditLog(_audit_log) => {
334                // Audit logs are not stored in-memory.
335            }
336            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
337                self.apply_storage_collection_metadata_update(
338                    storage_collection_metadata,
339                    diff,
340                    retractions,
341                );
342            }
343            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
344                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
345            }
346        }
347
348        Ok(())
349    }
350
351    #[instrument(level = "debug")]
352    fn apply_role_auth_update(
353        &mut self,
354        role_auth: mz_catalog::durable::RoleAuth,
355        diff: StateDiff,
356        retractions: &mut InProgressRetractions,
357    ) {
358        apply_with_update(
359            &mut self.role_auth_by_id,
360            role_auth,
361            |role_auth| role_auth.role_id,
362            diff,
363            &mut retractions.role_auths,
364        );
365    }
366
367    #[instrument(level = "debug")]
368    fn apply_role_update(
369        &mut self,
370        role: mz_catalog::durable::Role,
371        diff: StateDiff,
372        retractions: &mut InProgressRetractions,
373    ) {
374        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
375        apply_with_update(
376            &mut self.roles_by_id,
377            role,
378            |role| role.id,
379            diff,
380            &mut retractions.roles,
381        );
382    }
383
384    #[instrument(level = "debug")]
385    fn apply_database_update(
386        &mut self,
387        database: mz_catalog::durable::Database,
388        diff: StateDiff,
389        retractions: &mut InProgressRetractions,
390    ) {
391        apply_inverted_lookup(
392            &mut self.database_by_name,
393            &database.name,
394            database.id,
395            diff,
396        );
397        apply_with_update(
398            &mut self.database_by_id,
399            database,
400            |database| database.id,
401            diff,
402            &mut retractions.databases,
403        );
404    }
405
406    #[instrument(level = "debug")]
407    fn apply_schema_update(
408        &mut self,
409        schema: mz_catalog::durable::Schema,
410        diff: StateDiff,
411        retractions: &mut InProgressRetractions,
412    ) {
413        let (schemas_by_id, schemas_by_name) = match &schema.database_id {
414            Some(database_id) => {
415                let db = self
416                    .database_by_id
417                    .get_mut(database_id)
418                    .expect("catalog out of sync");
419                (&mut db.schemas_by_id, &mut db.schemas_by_name)
420            }
421            None => (
422                &mut self.ambient_schemas_by_id,
423                &mut self.ambient_schemas_by_name,
424            ),
425        };
426        apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
427        apply_with_update(
428            schemas_by_id,
429            schema,
430            |schema| schema.id,
431            diff,
432            &mut retractions.schemas,
433        );
434    }
435
436    #[instrument(level = "debug")]
437    fn apply_default_privilege_update(
438        &mut self,
439        default_privilege: mz_catalog::durable::DefaultPrivilege,
440        diff: StateDiff,
441        _retractions: &mut InProgressRetractions,
442    ) {
443        match diff {
444            StateDiff::Addition => self
445                .default_privileges
446                .grant(default_privilege.object, default_privilege.acl_item),
447            StateDiff::Retraction => self
448                .default_privileges
449                .revoke(&default_privilege.object, &default_privilege.acl_item),
450        }
451    }
452
453    #[instrument(level = "debug")]
454    fn apply_system_privilege_update(
455        &mut self,
456        system_privilege: MzAclItem,
457        diff: StateDiff,
458        _retractions: &mut InProgressRetractions,
459    ) {
460        match diff {
461            StateDiff::Addition => self.system_privileges.grant(system_privilege),
462            StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
463        }
464    }
465
466    #[instrument(level = "debug")]
467    fn apply_system_configuration_update(
468        &mut self,
469        system_configuration: mz_catalog::durable::SystemConfiguration,
470        diff: StateDiff,
471        _retractions: &mut InProgressRetractions,
472    ) {
473        let res = match diff {
474            StateDiff::Addition => self.insert_system_configuration(
475                &system_configuration.name,
476                VarInput::Flat(&system_configuration.value),
477            ),
478            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
479        };
480        match res {
481            Ok(_) => (),
482            // When system variables are deleted, nothing deletes them from the underlying
483            // durable catalog, which isn't great. Still, we need to be able to ignore
484            // unknown variables.
485            Err(Error {
486                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
487            }) => {
488                warn!(%name, "unknown system parameter from catalog storage");
489            }
490            Err(e) => panic!("unable to update system variable: {e:?}"),
491        }
492    }
493
494    #[instrument(level = "debug")]
495    fn apply_cluster_update(
496        &mut self,
497        cluster: mz_catalog::durable::Cluster,
498        diff: StateDiff,
499        retractions: &mut InProgressRetractions,
500    ) {
501        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
502        apply_with_update(
503            &mut self.clusters_by_id,
504            cluster,
505            |cluster| cluster.id,
506            diff,
507            &mut retractions.clusters,
508        );
509    }
510
511    #[instrument(level = "debug")]
512    fn apply_network_policy_update(
513        &mut self,
514        policy: mz_catalog::durable::NetworkPolicy,
515        diff: StateDiff,
516        retractions: &mut InProgressRetractions,
517    ) {
518        apply_inverted_lookup(
519            &mut self.network_policies_by_name,
520            &policy.name,
521            policy.id,
522            diff,
523        );
524        apply_with_update(
525            &mut self.network_policies_by_id,
526            policy,
527            |policy| policy.id,
528            diff,
529            &mut retractions.network_policies,
530        );
531    }
532
533    #[instrument(level = "debug")]
534    fn apply_introspection_source_index_update(
535        &mut self,
536        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
537        diff: StateDiff,
538        retractions: &mut InProgressRetractions,
539    ) {
540        let cluster = self
541            .clusters_by_id
542            .get_mut(&introspection_source_index.cluster_id)
543            .expect("catalog out of sync");
544        let log = BUILTIN_LOG_LOOKUP
545            .get(introspection_source_index.name.as_str())
546            .expect("missing log");
547        apply_inverted_lookup(
548            &mut cluster.log_indexes,
549            &log.variant,
550            introspection_source_index.index_id,
551            diff,
552        );
553
554        match diff {
555            StateDiff::Addition => {
556                if let Some(mut entry) = retractions
557                    .introspection_source_indexes
558                    .remove(&introspection_source_index.item_id)
559                {
560                    // This should only happen during startup as a result of builtin migrations. We
561                    // create a new index item and replace the old one with it.
562                    let (index_name, index) = self.create_introspection_source_index(
563                        introspection_source_index.cluster_id,
564                        log,
565                        introspection_source_index.index_id,
566                    );
567                    assert_eq!(entry.id, introspection_source_index.item_id);
568                    assert_eq!(entry.oid, introspection_source_index.oid);
569                    assert_eq!(entry.name, index_name);
570                    entry.item = index;
571                    self.insert_entry(entry);
572                } else {
573                    self.insert_introspection_source_index(
574                        introspection_source_index.cluster_id,
575                        log,
576                        introspection_source_index.item_id,
577                        introspection_source_index.index_id,
578                        introspection_source_index.oid,
579                    );
580                }
581            }
582            StateDiff::Retraction => {
583                let entry = self.drop_item(introspection_source_index.item_id);
584                retractions
585                    .introspection_source_indexes
586                    .insert(entry.id, entry);
587            }
588        }
589    }
590
591    #[instrument(level = "debug")]
592    fn apply_cluster_replica_update(
593        &mut self,
594        cluster_replica: mz_catalog::durable::ClusterReplica,
595        diff: StateDiff,
596        _retractions: &mut InProgressRetractions,
597    ) {
598        let cluster = self
599            .clusters_by_id
600            .get(&cluster_replica.cluster_id)
601            .expect("catalog out of sync");
602        let azs = cluster.availability_zones();
603        let location = self
604            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
605            .expect("catalog in unexpected state");
606        let cluster = self
607            .clusters_by_id
608            .get_mut(&cluster_replica.cluster_id)
609            .expect("catalog out of sync");
610        apply_inverted_lookup(
611            &mut cluster.replica_id_by_name_,
612            &cluster_replica.name,
613            cluster_replica.replica_id,
614            diff,
615        );
616        match diff {
617            StateDiff::Retraction => {
618                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
619                assert!(
620                    prev.is_some(),
621                    "retraction does not match existing value: {:?}",
622                    cluster_replica.replica_id
623                );
624            }
625            StateDiff::Addition => {
626                let logging = ReplicaLogging {
627                    log_logging: cluster_replica.config.logging.log_logging,
628                    interval: cluster_replica.config.logging.interval,
629                };
630                let config = ReplicaConfig {
631                    location,
632                    compute: ComputeReplicaConfig { logging },
633                };
634                let mem_cluster_replica = ClusterReplica {
635                    name: cluster_replica.name.clone(),
636                    cluster_id: cluster_replica.cluster_id,
637                    replica_id: cluster_replica.replica_id,
638                    config,
639                    owner_id: cluster_replica.owner_id,
640                };
641                let prev = cluster
642                    .replicas_by_id_
643                    .insert(cluster_replica.replica_id, mem_cluster_replica);
644                assert_eq!(
645                    prev, None,
646                    "values must be explicitly retracted before inserting a new value: {:?}",
647                    cluster_replica.replica_id
648                );
649            }
650        }
651    }
652
653    #[instrument(level = "debug")]
654    fn apply_system_object_mapping_update(
655        &mut self,
656        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
657        diff: StateDiff,
658        retractions: &mut InProgressRetractions,
659        local_expression_cache: &mut LocalExpressionCache,
660    ) {
661        let item_id = system_object_mapping.unique_identifier.catalog_id;
662        let global_id = system_object_mapping.unique_identifier.global_id;
663
664        if system_object_mapping.unique_identifier.runtime_alterable() {
665            // Runtime-alterable system objects have real entries in the items
666            // collection and so get handled through the normal `insert_item`
667            // and `drop_item` code paths.
668            return;
669        }
670
671        if let StateDiff::Retraction = diff {
672            let entry = self.drop_item(item_id);
673            retractions.system_object_mappings.insert(item_id, entry);
674            return;
675        }
676
677        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
678            // This implies that we updated the fingerprint for some builtin item. The retraction
679            // was parsed, planned, and optimized using the compiled in definition, not the
680            // definition from a previous version. So we can just stick the old entry back into the
681            // catalog.
682            self.insert_entry(entry);
683            return;
684        }
685
686        let builtin = BUILTIN_LOOKUP
687            .get(&system_object_mapping.description)
688            .expect("missing builtin")
689            .1;
690        let schema_name = builtin.schema();
691        let schema_id = self
692            .ambient_schemas_by_name
693            .get(schema_name)
694            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
695        let name = QualifiedItemName {
696            qualifiers: ItemQualifiers {
697                database_spec: ResolvedDatabaseSpecifier::Ambient,
698                schema_spec: SchemaSpecifier::Id(*schema_id),
699            },
700            item: builtin.name().into(),
701        };
702        match builtin {
703            Builtin::Log(log) => {
704                let mut acl_items = vec![rbac::owner_privilege(
705                    mz_sql::catalog::ObjectType::Source,
706                    MZ_SYSTEM_ROLE_ID,
707                )];
708                acl_items.extend_from_slice(&log.access);
709                self.insert_item(
710                    item_id,
711                    log.oid,
712                    name.clone(),
713                    CatalogItem::Log(Log {
714                        variant: log.variant,
715                        global_id,
716                    }),
717                    MZ_SYSTEM_ROLE_ID,
718                    PrivilegeMap::from_mz_acl_items(acl_items),
719                );
720            }
721
722            Builtin::Table(table) => {
723                let mut acl_items = vec![rbac::owner_privilege(
724                    mz_sql::catalog::ObjectType::Table,
725                    MZ_SYSTEM_ROLE_ID,
726                )];
727                acl_items.extend_from_slice(&table.access);
728
729                self.insert_item(
730                    item_id,
731                    table.oid,
732                    name.clone(),
733                    CatalogItem::Table(Table {
734                        create_sql: None,
735                        desc: VersionedRelationDesc::new(table.desc.clone()),
736                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
737                        conn_id: None,
738                        resolved_ids: ResolvedIds::empty(),
739                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
740                            || {
741                                self.system_config()
742                                    .metrics_retention()
743                                    .try_into()
744                                    .expect("invalid metrics retention")
745                            },
746                        ),
747                        is_retained_metrics_object: table.is_retained_metrics_object,
748                        data_source: TableDataSource::TableWrites {
749                            defaults: vec![Expr::null(); table.desc.arity()],
750                        },
751                    }),
752                    MZ_SYSTEM_ROLE_ID,
753                    PrivilegeMap::from_mz_acl_items(acl_items),
754                );
755            }
756            Builtin::Index(index) => {
757                let custom_logical_compaction_window =
758                    index.is_retained_metrics_object.then(|| {
759                        self.system_config()
760                            .metrics_retention()
761                            .try_into()
762                            .expect("invalid metrics retention")
763                    });
764                // Indexes can't be versioned.
765                let versions = BTreeMap::new();
766
767                let item = self
768                    .parse_item(
769                        global_id,
770                        &index.create_sql(),
771                        &versions,
772                        None,
773                        index.is_retained_metrics_object,
774                        custom_logical_compaction_window,
775                        local_expression_cache,
776                        None,
777                    )
778                    .unwrap_or_else(|e| {
779                        panic!(
780                            "internal error: failed to load bootstrap index:\n\
781                                    {}\n\
782                                    error:\n\
783                                    {:?}\n\n\
784                                    make sure that the schema name is specified in the builtin index's create sql statement.",
785                            index.name, e
786                        )
787                    });
788                let CatalogItem::Index(_) = item else {
789                    panic!(
790                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
791                        index.name
792                    );
793                };
794
795                self.insert_item(
796                    item_id,
797                    index.oid,
798                    name,
799                    item,
800                    MZ_SYSTEM_ROLE_ID,
801                    PrivilegeMap::default(),
802                );
803            }
804            Builtin::View(_) => {
805                // parse_views is responsible for inserting all builtin views.
806                unreachable!("views added elsewhere");
807            }
808
809            // Note: Element types must be loaded before array types.
810            Builtin::Type(typ) => {
811                let typ = self.resolve_builtin_type_references(typ);
812                if let CatalogType::Array { element_reference } = typ.details.typ {
813                    let entry = self.get_entry_mut(&element_reference);
814                    let item_type = match &mut entry.item {
815                        CatalogItem::Type(item_type) => item_type,
816                        _ => unreachable!("types can only reference other types"),
817                    };
818                    item_type.details.array_id = Some(item_id);
819                }
820
821                let schema_id = self.resolve_system_schema(typ.schema);
822
823                self.insert_item(
824                    item_id,
825                    typ.oid,
826                    QualifiedItemName {
827                        qualifiers: ItemQualifiers {
828                            database_spec: ResolvedDatabaseSpecifier::Ambient,
829                            schema_spec: SchemaSpecifier::Id(schema_id),
830                        },
831                        item: typ.name.to_owned(),
832                    },
833                    CatalogItem::Type(Type {
834                        create_sql: None,
835                        global_id,
836                        details: typ.details.clone(),
837                        resolved_ids: ResolvedIds::empty(),
838                    }),
839                    MZ_SYSTEM_ROLE_ID,
840                    PrivilegeMap::from_mz_acl_items(vec![
841                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
842                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
843                    ]),
844                );
845            }
846
847            Builtin::Func(func) => {
848                // This OID is never used. `func` has a `Vec` of implementations and
849                // each implementation has its own OID. Those are the OIDs that are
850                // actually used by the system.
851                let oid = INVALID_OID;
852                self.insert_item(
853                    item_id,
854                    oid,
855                    name.clone(),
856                    CatalogItem::Func(Func {
857                        inner: func.inner,
858                        global_id,
859                    }),
860                    MZ_SYSTEM_ROLE_ID,
861                    PrivilegeMap::default(),
862                );
863            }
864
865            Builtin::Source(coll) => {
866                let mut acl_items = vec![rbac::owner_privilege(
867                    mz_sql::catalog::ObjectType::Source,
868                    MZ_SYSTEM_ROLE_ID,
869                )];
870                acl_items.extend_from_slice(&coll.access);
871
872                self.insert_item(
873                    item_id,
874                    coll.oid,
875                    name.clone(),
876                    CatalogItem::Source(Source {
877                        create_sql: None,
878                        data_source: DataSourceDesc::Introspection(coll.data_source),
879                        desc: coll.desc.clone(),
880                        global_id,
881                        timeline: Timeline::EpochMilliseconds,
882                        resolved_ids: ResolvedIds::empty(),
883                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
884                            || {
885                                self.system_config()
886                                    .metrics_retention()
887                                    .try_into()
888                                    .expect("invalid metrics retention")
889                            },
890                        ),
891                        is_retained_metrics_object: coll.is_retained_metrics_object,
892                    }),
893                    MZ_SYSTEM_ROLE_ID,
894                    PrivilegeMap::from_mz_acl_items(acl_items),
895                );
896            }
897            Builtin::ContinualTask(ct) => {
898                let mut acl_items = vec![rbac::owner_privilege(
899                    mz_sql::catalog::ObjectType::Source,
900                    MZ_SYSTEM_ROLE_ID,
901                )];
902                acl_items.extend_from_slice(&ct.access);
903                // Continual Tasks can't be versioned.
904                let versions = BTreeMap::new();
905
906                let item = self
907                    .parse_item(
908                        global_id,
909                        &ct.create_sql(),
910                        &versions,
911                        None,
912                        false,
913                        None,
914                        local_expression_cache,
915                        None,
916                    )
917                    .unwrap_or_else(|e| {
918                        panic!(
919                            "internal error: failed to load bootstrap continual task:\n\
920                                    {}\n\
921                                    error:\n\
922                                    {:?}\n\n\
923                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
924                            ct.name, e
925                        )
926                    });
927                let CatalogItem::ContinualTask(_) = &item else {
928                    panic!(
929                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
930                        ct.name
931                    );
932                };
933
934                self.insert_item(
935                    item_id,
936                    ct.oid,
937                    name,
938                    item,
939                    MZ_SYSTEM_ROLE_ID,
940                    PrivilegeMap::from_mz_acl_items(acl_items),
941                );
942            }
943            Builtin::Connection(connection) => {
944                // Connections can't be versioned.
945                let versions = BTreeMap::new();
946                let mut item = self
947                    .parse_item(
948                        global_id,
949                        connection.sql,
950                        &versions,
951                        None,
952                        false,
953                        None,
954                        local_expression_cache,
955                        None,
956                    )
957                    .unwrap_or_else(|e| {
958                        panic!(
959                            "internal error: failed to load bootstrap connection:\n\
960                                    {}\n\
961                                    error:\n\
962                                    {:?}\n\n\
963                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
964                            connection.name, e
965                        )
966                    });
967                let CatalogItem::Connection(_) = &mut item else {
968                    panic!(
969                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
970                        connection.name
971                    );
972                };
973
974                let mut acl_items = vec![rbac::owner_privilege(
975                    mz_sql::catalog::ObjectType::Connection,
976                    connection.owner_id.clone(),
977                )];
978                acl_items.extend_from_slice(connection.access);
979
980                self.insert_item(
981                    item_id,
982                    connection.oid,
983                    name.clone(),
984                    item,
985                    connection.owner_id.clone(),
986                    PrivilegeMap::from_mz_acl_items(acl_items),
987                );
988            }
989        }
990    }
991
992    #[instrument(level = "debug")]
993    fn apply_temporary_item_update(
994        &mut self,
995        temporary_item: TemporaryItem,
996        diff: StateDiff,
997        retractions: &mut InProgressRetractions,
998        local_expression_cache: &mut LocalExpressionCache,
999    ) {
1000        match diff {
1001            StateDiff::Addition => {
1002                let TemporaryItem {
1003                    id,
1004                    oid,
1005                    global_id,
1006                    schema_id,
1007                    name,
1008                    conn_id,
1009                    create_sql,
1010                    owner_id,
1011                    privileges,
1012                    extra_versions,
1013                } = temporary_item;
1014                // Lazily create the temporary schema if it doesn't exist yet.
1015                // We need the conn_id to create the schema, and it should always be Some for temp items.
1016                let temp_conn_id = conn_id
1017                    .as_ref()
1018                    .expect("temporary items must have a connection id");
1019                if !self.temporary_schemas.contains_key(temp_conn_id) {
1020                    self.create_temporary_schema(temp_conn_id, owner_id)
1021                        .expect("failed to create temporary schema");
1022                }
1023                let schema = self.find_temp_schema(&schema_id);
1024                let name = QualifiedItemName {
1025                    qualifiers: ItemQualifiers {
1026                        database_spec: schema.database().clone(),
1027                        schema_spec: schema.id().clone(),
1028                    },
1029                    item: name.clone(),
1030                };
1031
1032                let entry = match retractions.temp_items.remove(&id) {
1033                    Some(mut retraction) => {
1034                        assert_eq!(retraction.id, id);
1035
1036                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1037                        // item. This is a performance optimization and not needed for correctness.
1038                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1039                        // is still the same as the trait.
1040                        if retraction.create_sql() != create_sql {
1041                            let mut catalog_item = self
1042                                .deserialize_item(
1043                                    global_id,
1044                                    &create_sql,
1045                                    &extra_versions,
1046                                    local_expression_cache,
1047                                    Some(retraction.item),
1048                                )
1049                                .unwrap_or_else(|e| {
1050                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1051                                });
1052                            // Have to patch up the item because parsing doesn't
1053                            // take into account temporary schemas/conn_id.
1054                            // NOTE(aljoscha): I don't like how we're patching
1055                            // this in here, but it's but one of the ways in
1056                            // which temporary items are a bit weird. So, here
1057                            // we are ...
1058                            catalog_item.set_conn_id(conn_id);
1059                            retraction.item = catalog_item;
1060                        }
1061
1062                        retraction.id = id;
1063                        retraction.oid = oid;
1064                        retraction.name = name;
1065                        retraction.owner_id = owner_id;
1066                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1067                        retraction
1068                    }
1069                    None => {
1070                        let mut catalog_item = self
1071                            .deserialize_item(
1072                                global_id,
1073                                &create_sql,
1074                                &extra_versions,
1075                                local_expression_cache,
1076                                None,
1077                            )
1078                            .unwrap_or_else(|e| {
1079                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1080                            });
1081
1082                        // Have to patch up the item because parsing doesn't
1083                        // take into account temporary schemas/conn_id.
1084                        // NOTE(aljoscha): I don't like how we're patching this
1085                        // in here, but it's but one of the ways in which
1086                        // temporary items are a bit weird. So, here we are ...
1087                        catalog_item.set_conn_id(conn_id);
1088
1089                        CatalogEntry {
1090                            item: catalog_item,
1091                            referenced_by: Vec::new(),
1092                            used_by: Vec::new(),
1093                            id,
1094                            oid,
1095                            name,
1096                            owner_id,
1097                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1098                        }
1099                    }
1100                };
1101                self.insert_entry(entry);
1102            }
1103            StateDiff::Retraction => {
1104                let entry = self.drop_item(temporary_item.id);
1105                retractions.temp_items.insert(temporary_item.id, entry);
1106            }
1107        }
1108    }
1109
1110    #[instrument(level = "debug")]
1111    fn apply_item_update(
1112        &mut self,
1113        item: mz_catalog::durable::Item,
1114        diff: StateDiff,
1115        retractions: &mut InProgressRetractions,
1116        local_expression_cache: &mut LocalExpressionCache,
1117    ) -> Result<(), CatalogError> {
1118        match diff {
1119            StateDiff::Addition => {
1120                let key = item.key();
1121                let mz_catalog::durable::Item {
1122                    id,
1123                    oid,
1124                    global_id,
1125                    schema_id,
1126                    name,
1127                    create_sql,
1128                    owner_id,
1129                    privileges,
1130                    extra_versions,
1131                } = item;
1132                let schema = self.find_non_temp_schema(&schema_id);
1133                let name = QualifiedItemName {
1134                    qualifiers: ItemQualifiers {
1135                        database_spec: schema.database().clone(),
1136                        schema_spec: schema.id().clone(),
1137                    },
1138                    item: name.clone(),
1139                };
1140                let entry = match retractions.items.remove(&key) {
1141                    Some(retraction) => {
1142                        assert_eq!(retraction.id, item.id);
1143
1144                        let item = self
1145                            .deserialize_item(
1146                                global_id,
1147                                &create_sql,
1148                                &extra_versions,
1149                                local_expression_cache,
1150                                Some(retraction.item),
1151                            )
1152                            .unwrap_or_else(|e| {
1153                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1154                            });
1155
1156                        CatalogEntry {
1157                            item,
1158                            id,
1159                            oid,
1160                            name,
1161                            owner_id,
1162                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1163                            referenced_by: retraction.referenced_by,
1164                            used_by: retraction.used_by,
1165                        }
1166                    }
1167                    None => {
1168                        let catalog_item = self
1169                            .deserialize_item(
1170                                global_id,
1171                                &create_sql,
1172                                &extra_versions,
1173                                local_expression_cache,
1174                                None,
1175                            )
1176                            .unwrap_or_else(|e| {
1177                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1178                            });
1179                        CatalogEntry {
1180                            item: catalog_item,
1181                            referenced_by: Vec::new(),
1182                            used_by: Vec::new(),
1183                            id,
1184                            oid,
1185                            name,
1186                            owner_id,
1187                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1188                        }
1189                    }
1190                };
1191
1192                self.insert_entry(entry);
1193            }
1194            StateDiff::Retraction => {
1195                let entry = self.drop_item(item.id);
1196                let key = item.into_key_value().0;
1197                retractions.items.insert(key, entry);
1198            }
1199        }
1200        Ok(())
1201    }
1202
1203    #[instrument(level = "debug")]
1204    fn apply_comment_update(
1205        &mut self,
1206        comment: mz_catalog::durable::Comment,
1207        diff: StateDiff,
1208        _retractions: &mut InProgressRetractions,
1209    ) {
1210        match diff {
1211            StateDiff::Addition => {
1212                let prev = self.comments.update_comment(
1213                    comment.object_id,
1214                    comment.sub_component,
1215                    Some(comment.comment),
1216                );
1217                assert_eq!(
1218                    prev, None,
1219                    "values must be explicitly retracted before inserting a new value"
1220                );
1221            }
1222            StateDiff::Retraction => {
1223                let prev =
1224                    self.comments
1225                        .update_comment(comment.object_id, comment.sub_component, None);
1226                assert_eq!(
1227                    prev,
1228                    Some(comment.comment),
1229                    "retraction does not match existing value: ({:?}, {:?})",
1230                    comment.object_id,
1231                    comment.sub_component,
1232                );
1233            }
1234        }
1235    }
1236
1237    #[instrument(level = "debug")]
1238    fn apply_source_references_update(
1239        &mut self,
1240        source_references: mz_catalog::durable::SourceReferences,
1241        diff: StateDiff,
1242        _retractions: &mut InProgressRetractions,
1243    ) {
1244        match diff {
1245            StateDiff::Addition => {
1246                let prev = self
1247                    .source_references
1248                    .insert(source_references.source_id, source_references.into());
1249                assert!(
1250                    prev.is_none(),
1251                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1252                );
1253            }
1254            StateDiff::Retraction => {
1255                let prev = self.source_references.remove(&source_references.source_id);
1256                assert!(
1257                    prev.is_some(),
1258                    "retraction for a non-existent existing value: {source_references:?}"
1259                );
1260            }
1261        }
1262    }
1263
1264    #[instrument(level = "debug")]
1265    fn apply_storage_collection_metadata_update(
1266        &mut self,
1267        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1268        diff: StateDiff,
1269        _retractions: &mut InProgressRetractions,
1270    ) {
1271        apply_inverted_lookup(
1272            &mut self.storage_metadata.collection_metadata,
1273            &storage_collection_metadata.id,
1274            storage_collection_metadata.shard,
1275            diff,
1276        );
1277    }
1278
1279    #[instrument(level = "debug")]
1280    fn apply_unfinalized_shard_update(
1281        &mut self,
1282        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1283        diff: StateDiff,
1284        _retractions: &mut InProgressRetractions,
1285    ) {
1286        match diff {
1287            StateDiff::Addition => {
1288                let newly_inserted = self
1289                    .storage_metadata
1290                    .unfinalized_shards
1291                    .insert(unfinalized_shard.shard);
1292                assert!(
1293                    newly_inserted,
1294                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1295                );
1296            }
1297            StateDiff::Retraction => {
1298                let removed = self
1299                    .storage_metadata
1300                    .unfinalized_shards
1301                    .remove(&unfinalized_shard.shard);
1302                assert!(
1303                    removed,
1304                    "retraction does not match existing value: {unfinalized_shard:?}"
1305                );
1306            }
1307        }
1308    }
1309
1310    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1311    /// durable catalog.
1312    #[instrument]
1313    pub(crate) fn generate_builtin_table_updates(
1314        &self,
1315        updates: Vec<StateUpdate>,
1316    ) -> Vec<BuiltinTableUpdate> {
1317        let mut builtin_table_updates = Vec::new();
1318        for StateUpdate { kind, ts: _, diff } in updates {
1319            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1320            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1321            builtin_table_updates.extend(builtin_table_update);
1322        }
1323        builtin_table_updates
1324    }
1325
1326    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1327    /// durable catalog.
1328    #[instrument(level = "debug")]
1329    pub(crate) fn generate_builtin_table_update(
1330        &self,
1331        kind: StateUpdateKind,
1332        diff: StateDiff,
1333    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1334        let diff = diff.into();
1335        match kind {
1336            StateUpdateKind::Role(role) => {
1337                let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1338                for group_id in role.membership.map.keys() {
1339                    builtin_table_updates
1340                        .push(self.pack_role_members_update(*group_id, role.id, diff))
1341                }
1342                builtin_table_updates
1343            }
1344            StateUpdateKind::RoleAuth(role_auth) => {
1345                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1346            }
1347            StateUpdateKind::Database(database) => {
1348                vec![self.pack_database_update(&database.id, diff)]
1349            }
1350            StateUpdateKind::Schema(schema) => {
1351                let db_spec = schema.database_id.into();
1352                vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1353            }
1354            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1355                vec![self.pack_default_privileges_update(
1356                    &default_privilege.object,
1357                    &default_privilege.acl_item.grantee,
1358                    &default_privilege.acl_item.acl_mode,
1359                    diff,
1360                )]
1361            }
1362            StateUpdateKind::SystemPrivilege(system_privilege) => {
1363                vec![self.pack_system_privileges_update(system_privilege, diff)]
1364            }
1365            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1366            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1367            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1368                self.pack_item_update(introspection_source_index.item_id, diff)
1369            }
1370            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1371                cluster_replica.cluster_id,
1372                &cluster_replica.name,
1373                diff,
1374            ),
1375            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1376                // Runtime-alterable system objects have real entries in the
1377                // items collection and so get handled through the normal
1378                // `StateUpdateKind::Item`.`
1379                if !system_object_mapping.unique_identifier.runtime_alterable() {
1380                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1381                } else {
1382                    vec![]
1383                }
1384            }
1385            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1386            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1387            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1388                comment.object_id,
1389                comment.sub_component,
1390                &comment.comment,
1391                diff,
1392            )],
1393            StateUpdateKind::SourceReferences(source_references) => {
1394                self.pack_source_references_update(&source_references, diff)
1395            }
1396            StateUpdateKind::AuditLog(audit_log) => {
1397                vec![
1398                    self.pack_audit_log_update(&audit_log.event, diff)
1399                        .expect("could not pack audit log update"),
1400                ]
1401            }
1402            StateUpdateKind::NetworkPolicy(policy) => self
1403                .pack_network_policy_update(&policy.id, diff)
1404                .expect("could not pack audit log update"),
1405            StateUpdateKind::StorageCollectionMetadata(_)
1406            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1407        }
1408    }
1409
1410    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1411        self.entry_by_id
1412            .get_mut(id)
1413            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1414    }
1415
1416    fn get_schema_mut(
1417        &mut self,
1418        database_spec: &ResolvedDatabaseSpecifier,
1419        schema_spec: &SchemaSpecifier,
1420        conn_id: &ConnectionId,
1421    ) -> &mut Schema {
1422        // Keep in sync with `get_schemas`
1423        match (database_spec, schema_spec) {
1424            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1425                .temporary_schemas
1426                .get_mut(conn_id)
1427                .expect("catalog out of sync"),
1428            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1429                .ambient_schemas_by_id
1430                .get_mut(id)
1431                .expect("catalog out of sync"),
1432            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1433                .database_by_id
1434                .get_mut(database_id)
1435                .expect("catalog out of sync")
1436                .schemas_by_id
1437                .get_mut(schema_id)
1438                .expect("catalog out of sync"),
1439            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1440                unreachable!("temporary schemas are in the ambient database")
1441            }
1442        }
1443    }
1444
1445    /// Install builtin views to the catalog. This is its own function so that views can be
1446    /// optimized in parallel.
1447    ///
1448    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1449    /// problems by sniffing out specific errors and then retrying once those dependencies are
1450    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1451    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1452    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1453    /// in awaiting with nothing retriggering the attempt.
1454    #[instrument(name = "catalog::parse_views")]
1455    async fn parse_builtin_views(
1456        state: &mut CatalogState,
1457        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1458        retractions: &mut InProgressRetractions,
1459        local_expression_cache: &mut LocalExpressionCache,
1460    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1461        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1462        let (updates, additions): (Vec<_>, Vec<_>) =
1463            builtin_views
1464                .into_iter()
1465                .partition_map(|(view, item_id, gid)| {
1466                    match retractions.system_object_mappings.remove(&item_id) {
1467                        Some(entry) => Either::Left(entry),
1468                        None => Either::Right((view, item_id, gid)),
1469                    }
1470                });
1471
1472        for entry in updates {
1473            // This implies that we updated the fingerprint for some builtin view. The retraction
1474            // was parsed, planned, and optimized using the compiled in definition, not the
1475            // definition from a previous version. So we can just stick the old entry back into the
1476            // catalog.
1477            let item_id = entry.id();
1478            state.insert_entry(entry);
1479            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1480        }
1481
1482        let mut handles = Vec::new();
1483        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1484            BTreeMap::new();
1485        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1486        // Some errors are due to the implementation of casts or SQL functions that depend on some
1487        // view. Instead of figuring out the exact view dependency, delay these until the end.
1488        let mut awaiting_all = Vec::new();
1489        // Completed views, needed to avoid race conditions.
1490        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1491        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1492
1493        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1494        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1495            .into_iter()
1496            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1497            .collect();
1498        let item_ids: Vec<_> = views.keys().copied().collect();
1499
1500        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1501        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1502            if handles.is_empty() && ready.is_empty() {
1503                // Enqueue the views that were waiting for all the others.
1504                ready.extend(awaiting_all.drain(..));
1505            }
1506
1507            // Spawn tasks for all ready views.
1508            if !ready.is_empty() {
1509                let spawn_state = Arc::new(state.clone());
1510                while let Some(id) = ready.pop_front() {
1511                    let (view, global_id) = views.get(&id).expect("must exist");
1512                    let global_id = *global_id;
1513                    let create_sql = view.create_sql();
1514                    // Views can't be versioned.
1515                    let versions = BTreeMap::new();
1516
1517                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1518                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1519                    let task_state = Arc::clone(&spawn_state);
1520                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1521                    let handle = mz_ore::task::spawn_blocking(
1522                        || "parse view",
1523                        move || {
1524                            span.in_scope(|| {
1525                                let res = task_state.parse_item_inner(
1526                                    global_id,
1527                                    &create_sql,
1528                                    &versions,
1529                                    None,
1530                                    false,
1531                                    None,
1532                                    cached_expr,
1533                                    None,
1534                                );
1535                                (id, global_id, res)
1536                            })
1537                        },
1538                    );
1539                    handles.push(handle);
1540                }
1541            }
1542
1543            // Wait for a view to be ready.
1544            let (selected, _idx, remaining) = future::select_all(handles).await;
1545            handles = remaining;
1546            let (id, global_id, res) = selected;
1547            let mut insert_cached_expr = |cached_expr| {
1548                if let Some(cached_expr) = cached_expr {
1549                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1550                }
1551            };
1552            match res {
1553                Ok((item, uncached_expr)) => {
1554                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1555                        local_expression_cache.insert_uncached_expression(
1556                            global_id,
1557                            uncached_expr,
1558                            optimizer_features,
1559                        );
1560                    }
1561                    // Add item to catalog.
1562                    let (view, _gid) = views.remove(&id).expect("must exist");
1563                    let schema_id = state
1564                        .ambient_schemas_by_name
1565                        .get(view.schema)
1566                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1567                    let qname = QualifiedItemName {
1568                        qualifiers: ItemQualifiers {
1569                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1570                            schema_spec: SchemaSpecifier::Id(*schema_id),
1571                        },
1572                        item: view.name.into(),
1573                    };
1574                    let mut acl_items = vec![rbac::owner_privilege(
1575                        mz_sql::catalog::ObjectType::View,
1576                        MZ_SYSTEM_ROLE_ID,
1577                    )];
1578                    acl_items.extend_from_slice(&view.access);
1579
1580                    state.insert_item(
1581                        id,
1582                        view.oid,
1583                        qname,
1584                        item,
1585                        MZ_SYSTEM_ROLE_ID,
1586                        PrivilegeMap::from_mz_acl_items(acl_items),
1587                    );
1588
1589                    // Enqueue any items waiting on this dependency.
1590                    let mut resolved_dependent_items = Vec::new();
1591                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1592                        resolved_dependent_items.extend(dependent_items);
1593                    }
1594                    let entry = state.get_entry(&id);
1595                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1596                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1597                        resolved_dependent_items.extend(dependent_items);
1598                    }
1599                    ready.extend(resolved_dependent_items);
1600
1601                    completed_ids.insert(id);
1602                    completed_names.insert(full_name);
1603                }
1604                // If we were missing a dependency, wait for it to be added.
1605                Err((
1606                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1607                    cached_expr,
1608                )) => {
1609                    insert_cached_expr(cached_expr);
1610                    if completed_ids.contains(&missing_dep) {
1611                        ready.push_back(id);
1612                    } else {
1613                        awaiting_id_dependencies
1614                            .entry(missing_dep)
1615                            .or_default()
1616                            .push(id);
1617                    }
1618                }
1619                // If we were missing a dependency, wait for it to be added.
1620                Err((
1621                    AdapterError::PlanError(plan::PlanError::Catalog(
1622                        SqlCatalogError::UnknownItem(missing_dep),
1623                    )),
1624                    cached_expr,
1625                )) => {
1626                    insert_cached_expr(cached_expr);
1627                    match CatalogItemId::from_str(&missing_dep) {
1628                        Ok(missing_dep) => {
1629                            if completed_ids.contains(&missing_dep) {
1630                                ready.push_back(id);
1631                            } else {
1632                                awaiting_id_dependencies
1633                                    .entry(missing_dep)
1634                                    .or_default()
1635                                    .push(id);
1636                            }
1637                        }
1638                        Err(_) => {
1639                            if completed_names.contains(&missing_dep) {
1640                                ready.push_back(id);
1641                            } else {
1642                                awaiting_name_dependencies
1643                                    .entry(missing_dep)
1644                                    .or_default()
1645                                    .push(id);
1646                            }
1647                        }
1648                    }
1649                }
1650                Err((
1651                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1652                    cached_expr,
1653                )) => {
1654                    insert_cached_expr(cached_expr);
1655                    awaiting_all.push(id);
1656                }
1657                Err((e, _)) => {
1658                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1659                    panic!(
1660                        "internal error: failed to load bootstrap view:\n\
1661                            {name}\n\
1662                            error:\n\
1663                            {e:?}\n\n\
1664                            Make sure that the schema name is specified in the builtin view's create sql statement.
1665                            ",
1666                        name = bad_view.name,
1667                    )
1668                }
1669            }
1670        }
1671
1672        assert!(awaiting_id_dependencies.is_empty());
1673        assert!(
1674            awaiting_name_dependencies.is_empty(),
1675            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1676        );
1677        assert!(awaiting_all.is_empty());
1678        assert!(views.is_empty());
1679
1680        // Generate a builtin table update for all the new views.
1681        builtin_table_updates.extend(
1682            item_ids
1683                .into_iter()
1684                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1685        );
1686
1687        builtin_table_updates
1688    }
1689
1690    /// Associates a name, `CatalogItemId`, and entry.
1691    fn insert_entry(&mut self, entry: CatalogEntry) {
1692        if !entry.id.is_system() {
1693            if let Some(cluster_id) = entry.item.cluster_id() {
1694                self.clusters_by_id
1695                    .get_mut(&cluster_id)
1696                    .expect("catalog out of sync")
1697                    .bound_objects
1698                    .insert(entry.id);
1699            };
1700        }
1701
1702        for u in entry.references().items() {
1703            match self.entry_by_id.get_mut(u) {
1704                Some(metadata) => metadata.referenced_by.push(entry.id()),
1705                None => panic!(
1706                    "Catalog: missing dependent catalog item {} while installing {}",
1707                    &u,
1708                    self.resolve_full_name(entry.name(), entry.conn_id())
1709                ),
1710            }
1711        }
1712        for u in entry.uses() {
1713            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1714            // present.
1715            if u == entry.id() {
1716                continue;
1717            }
1718            match self.entry_by_id.get_mut(&u) {
1719                Some(metadata) => metadata.used_by.push(entry.id()),
1720                None => panic!(
1721                    "Catalog: missing dependent catalog item {} while installing {}",
1722                    &u,
1723                    self.resolve_full_name(entry.name(), entry.conn_id())
1724                ),
1725            }
1726        }
1727        for gid in entry.item.global_ids() {
1728            self.entry_by_global_id.insert(gid, entry.id());
1729        }
1730        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1731        // Lazily create the temporary schema if this is a temporary item and the schema
1732        // doesn't exist yet.
1733        if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1734            && !self.temporary_schemas.contains_key(conn_id)
1735        {
1736            self.create_temporary_schema(conn_id, entry.owner_id)
1737                .expect("failed to create temporary schema");
1738        }
1739        let schema = self.get_schema_mut(
1740            &entry.name().qualifiers.database_spec,
1741            &entry.name().qualifiers.schema_spec,
1742            conn_id,
1743        );
1744
1745        let prev_id = match entry.item() {
1746            CatalogItem::Func(_) => schema
1747                .functions
1748                .insert(entry.name().item.clone(), entry.id()),
1749            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1750            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1751        };
1752
1753        assert!(
1754            prev_id.is_none(),
1755            "builtin name collision on {:?}",
1756            entry.name().item.clone()
1757        );
1758
1759        self.entry_by_id.insert(entry.id(), entry.clone());
1760    }
1761
1762    /// Associates a name, [`CatalogItemId`], and entry.
1763    fn insert_item(
1764        &mut self,
1765        id: CatalogItemId,
1766        oid: u32,
1767        name: QualifiedItemName,
1768        item: CatalogItem,
1769        owner_id: RoleId,
1770        privileges: PrivilegeMap,
1771    ) {
1772        let entry = CatalogEntry {
1773            item,
1774            name,
1775            id,
1776            oid,
1777            used_by: Vec::new(),
1778            referenced_by: Vec::new(),
1779            owner_id,
1780            privileges,
1781        };
1782
1783        self.insert_entry(entry);
1784    }
1785
1786    #[mz_ore::instrument(level = "trace")]
1787    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1788        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1789        for u in metadata.references().items() {
1790            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1791                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1792            }
1793        }
1794        for u in metadata.uses() {
1795            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1796                dep_metadata.used_by.retain(|u| *u != metadata.id())
1797            }
1798        }
1799        for gid in metadata.global_ids() {
1800            self.entry_by_global_id.remove(&gid);
1801        }
1802
1803        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1804        let schema = self.get_schema_mut(
1805            &metadata.name().qualifiers.database_spec,
1806            &metadata.name().qualifiers.schema_spec,
1807            conn_id,
1808        );
1809        if metadata.item_type() == CatalogItemType::Type {
1810            schema
1811                .types
1812                .remove(&metadata.name().item)
1813                .expect("catalog out of sync");
1814        } else {
1815            // Functions would need special handling, but we don't yet support
1816            // dropping functions.
1817            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1818
1819            schema
1820                .items
1821                .remove(&metadata.name().item)
1822                .expect("catalog out of sync");
1823        };
1824
1825        if !id.is_system() {
1826            if let Some(cluster_id) = metadata.item().cluster_id() {
1827                assert!(
1828                    self.clusters_by_id
1829                        .get_mut(&cluster_id)
1830                        .expect("catalog out of sync")
1831                        .bound_objects
1832                        .remove(&id),
1833                    "catalog out of sync"
1834                );
1835            }
1836        }
1837
1838        metadata
1839    }
1840
1841    fn insert_introspection_source_index(
1842        &mut self,
1843        cluster_id: ClusterId,
1844        log: &'static BuiltinLog,
1845        item_id: CatalogItemId,
1846        global_id: GlobalId,
1847        oid: u32,
1848    ) {
1849        let (index_name, index) =
1850            self.create_introspection_source_index(cluster_id, log, global_id);
1851        self.insert_item(
1852            item_id,
1853            oid,
1854            index_name,
1855            index,
1856            MZ_SYSTEM_ROLE_ID,
1857            PrivilegeMap::default(),
1858        );
1859    }
1860
1861    fn create_introspection_source_index(
1862        &self,
1863        cluster_id: ClusterId,
1864        log: &'static BuiltinLog,
1865        global_id: GlobalId,
1866    ) -> (QualifiedItemName, CatalogItem) {
1867        let source_name = FullItemName {
1868            database: RawDatabaseSpecifier::Ambient,
1869            schema: log.schema.into(),
1870            item: log.name.into(),
1871        };
1872        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1873        let mut index_name = QualifiedItemName {
1874            qualifiers: ItemQualifiers {
1875                database_spec: ResolvedDatabaseSpecifier::Ambient,
1876                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1877            },
1878            item: index_name.clone(),
1879        };
1880        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1881        let index_item_name = index_name.item.clone();
1882        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1883        let index = CatalogItem::Index(Index {
1884            global_id,
1885            on: log_global_id,
1886            keys: log
1887                .variant
1888                .index_by()
1889                .into_iter()
1890                .map(MirScalarExpr::column)
1891                .collect(),
1892            create_sql: index_sql(
1893                index_item_name,
1894                cluster_id,
1895                source_name,
1896                &log.variant.desc(),
1897                &log.variant.index_by(),
1898            ),
1899            conn_id: None,
1900            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1901            cluster_id,
1902            is_retained_metrics_object: false,
1903            custom_logical_compaction_window: None,
1904        });
1905        (index_name, index)
1906    }
1907
1908    /// Insert system configuration `name` with `value`.
1909    ///
1910    /// Return a `bool` value indicating whether the configuration was modified
1911    /// by the call.
1912    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1913        Ok(self.system_configuration.set(name, value)?)
1914    }
1915
1916    /// Reset system configuration `name`.
1917    ///
1918    /// Return a `bool` value indicating whether the configuration was modified
1919    /// by the call.
1920    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1921        Ok(self.system_configuration.reset(name)?)
1922    }
1923}
1924
1925/// Sort [`StateUpdate`]s in dependency order.
1926///
1927/// # Panics
1928///
1929/// This function assumes that all provided `updates` have the same timestamp and will panic
1930/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
1931/// `StateUpdateKinds` are unique.
1932fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1933    fn push_update<T>(
1934        update: T,
1935        diff: StateDiff,
1936        retractions: &mut Vec<T>,
1937        additions: &mut Vec<T>,
1938    ) {
1939        match diff {
1940            StateDiff::Retraction => retractions.push(update),
1941            StateDiff::Addition => additions.push(update),
1942        }
1943    }
1944
1945    soft_assert_no_log!(
1946        updates.iter().map(|update| update.ts).all_equal(),
1947        "all timestamps should be equal: {updates:?}"
1948    );
1949    soft_assert_no_log!(
1950        {
1951            let mut dedup = BTreeSet::new();
1952            updates.iter().all(|update| dedup.insert(&update.kind))
1953        },
1954        "updates should be consolidated: {updates:?}"
1955    );
1956
1957    // Partition updates by type so that we can weave different update types into the right spots.
1958    let mut pre_cluster_retractions = Vec::new();
1959    let mut pre_cluster_additions = Vec::new();
1960    let mut cluster_retractions = Vec::new();
1961    let mut cluster_additions = Vec::new();
1962    let mut builtin_item_updates = Vec::new();
1963    let mut item_retractions = Vec::new();
1964    let mut item_additions = Vec::new();
1965    let mut temp_item_retractions = Vec::new();
1966    let mut temp_item_additions = Vec::new();
1967    let mut post_item_retractions = Vec::new();
1968    let mut post_item_additions = Vec::new();
1969    for update in updates {
1970        let diff = update.diff.clone();
1971        match update.kind {
1972            StateUpdateKind::Role(_)
1973            | StateUpdateKind::RoleAuth(_)
1974            | StateUpdateKind::Database(_)
1975            | StateUpdateKind::Schema(_)
1976            | StateUpdateKind::DefaultPrivilege(_)
1977            | StateUpdateKind::SystemPrivilege(_)
1978            | StateUpdateKind::SystemConfiguration(_)
1979            | StateUpdateKind::NetworkPolicy(_) => push_update(
1980                update,
1981                diff,
1982                &mut pre_cluster_retractions,
1983                &mut pre_cluster_additions,
1984            ),
1985            StateUpdateKind::Cluster(_)
1986            | StateUpdateKind::IntrospectionSourceIndex(_)
1987            | StateUpdateKind::ClusterReplica(_) => push_update(
1988                update,
1989                diff,
1990                &mut cluster_retractions,
1991                &mut cluster_additions,
1992            ),
1993            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1994                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1995            }
1996            StateUpdateKind::TemporaryItem(item) => push_update(
1997                (item, update.ts, update.diff),
1998                diff,
1999                &mut temp_item_retractions,
2000                &mut temp_item_additions,
2001            ),
2002            StateUpdateKind::Item(item) => push_update(
2003                (item, update.ts, update.diff),
2004                diff,
2005                &mut item_retractions,
2006                &mut item_additions,
2007            ),
2008            StateUpdateKind::Comment(_)
2009            | StateUpdateKind::SourceReferences(_)
2010            | StateUpdateKind::AuditLog(_)
2011            | StateUpdateKind::StorageCollectionMetadata(_)
2012            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2013                update,
2014                diff,
2015                &mut post_item_retractions,
2016                &mut post_item_additions,
2017            ),
2018        }
2019    }
2020
2021    // Sort builtin item updates by dependency.
2022    let builtin_item_updates = builtin_item_updates
2023        .into_iter()
2024        .map(|(system_object_mapping, ts, diff)| {
2025            let idx = BUILTIN_LOOKUP
2026                .get(&system_object_mapping.description)
2027                .expect("missing builtin")
2028                .0;
2029            (idx, system_object_mapping, ts, diff)
2030        })
2031        .sorted_by_key(|(idx, _, _, _)| *idx)
2032        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2033
2034    // Further partition builtin item updates.
2035    let mut other_builtin_retractions = Vec::new();
2036    let mut other_builtin_additions = Vec::new();
2037    let mut builtin_index_retractions = Vec::new();
2038    let mut builtin_index_additions = Vec::new();
2039    for (builtin_item_update, ts, diff) in builtin_item_updates {
2040        match &builtin_item_update.description.object_type {
2041            CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
2042                StateUpdate {
2043                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2044                    ts,
2045                    diff,
2046                },
2047                diff,
2048                &mut builtin_index_retractions,
2049                &mut builtin_index_additions,
2050            ),
2051            CatalogItemType::Table
2052            | CatalogItemType::Source
2053            | CatalogItemType::Sink
2054            | CatalogItemType::View
2055            | CatalogItemType::MaterializedView
2056            | CatalogItemType::Type
2057            | CatalogItemType::Func
2058            | CatalogItemType::Secret
2059            | CatalogItemType::Connection => push_update(
2060                StateUpdate {
2061                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2062                    ts,
2063                    diff,
2064                },
2065                diff,
2066                &mut other_builtin_retractions,
2067                &mut other_builtin_additions,
2068            ),
2069        }
2070    }
2071
2072    /// Sort items by their dependencies using topological sort.
2073    ///
2074    /// # Panics
2075    ///
2076    /// This function requires that all provided items have unique item IDs.
2077    fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2078        tracing::debug!(?items, "sorting items by dependencies");
2079
2080        let all_item_ids: BTreeSet<_> = items.iter().map(|item| item.0.id).collect();
2081
2082        // For each item, the update that contains it.
2083        let mut updates_by_id =
2084            BTreeMap::<CatalogItemId, (mz_catalog::durable::Item, Timestamp, StateDiff)>::new();
2085        // For each item, the number of unprocessed dependencies.
2086        let mut in_degree = BTreeMap::<CatalogItemId, usize>::new();
2087        // For each item, the IDs of items depending on it.
2088        let mut dependents = BTreeMap::<CatalogItemId, Vec<CatalogItemId>>::new();
2089        // Items that have no unprocessed dependencies.
2090        let mut ready = Vec::new();
2091
2092        // Build the graph.
2093        for (item, ts, diff) in items.drain(..) {
2094            let id = item.id;
2095            let statement = mz_sql::parse::parse(&item.create_sql)
2096                .expect("valid create_sql")
2097                .into_element()
2098                .ast;
2099
2100            let mut dependencies = mz_sql::names::dependencies(&statement)
2101                .expect("failed to find dependencies of item");
2102            // Remove any dependencies not contained in `items`.
2103            // As a defensive measure, also remove any self-references.
2104            dependencies.retain(|dep| all_item_ids.contains(dep) && *dep != id);
2105
2106            let prev = updates_by_id.insert(id, (item, ts, diff));
2107            assert_none!(prev);
2108
2109            in_degree.insert(id, dependencies.len());
2110
2111            for dep_id in &dependencies {
2112                dependents.entry(*dep_id).or_default().push(id);
2113            }
2114
2115            if dependencies.is_empty() {
2116                ready.push(id);
2117            }
2118        }
2119
2120        // Process items in topological order, pushing back into the provided Vec.
2121        while let Some(id) = ready.pop() {
2122            let update = updates_by_id.remove(&id).expect("must exist");
2123            items.push(update);
2124
2125            if let Some(depts) = dependents.get(&id) {
2126                for dept_id in depts {
2127                    let deg = in_degree.get_mut(dept_id).expect("must exist");
2128                    *deg -= 1;
2129                    if *deg == 0 {
2130                        ready.push(*dept_id);
2131                    }
2132                }
2133            }
2134        }
2135
2136        // Cycle detection: if we didn't process all items, there's a cycle.
2137        if !updates_by_id.is_empty() {
2138            panic!("programming error, cycle in item dependencies");
2139        }
2140    }
2141
2142    /// Sort item updates by dependency.
2143    ///
2144    /// First we group items into groups that are totally ordered by dependency. For example, when
2145    /// sorting all items by dependency we know that all tables can come after all sources, because
2146    /// a source can never depend on a table. Second, we sort the items in each group in
2147    /// topological order, or by ID, depending on the type.
2148    ///
2149    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2150    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2151    /// approach would be to investigate each item, discover their exact dependencies, and then
2152    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2153    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2154    /// referred to by name.
2155    ///
2156    /// The logic of this function should match [`sort_temp_item_updates`].
2157    fn sort_item_updates(
2158        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2159    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2160        // Partition items into groups s.t. each item in one group has a predefined order with all
2161        // items in other groups. For example, all sinks are ordered greater than all tables.
2162        let mut types = Vec::new();
2163        // N.B. Functions can depend on system tables, but not user tables.
2164        // TODO(udf): This will change when UDFs are supported.
2165        let mut funcs = Vec::new();
2166        let mut secrets = Vec::new();
2167        let mut connections = Vec::new();
2168        let mut sources = Vec::new();
2169        let mut tables = Vec::new();
2170        let mut derived_items = Vec::new();
2171        let mut sinks = Vec::new();
2172        let mut continual_tasks = Vec::new();
2173
2174        for update in item_updates {
2175            match update.0.item_type() {
2176                CatalogItemType::Type => types.push(update),
2177                CatalogItemType::Func => funcs.push(update),
2178                CatalogItemType::Secret => secrets.push(update),
2179                CatalogItemType::Connection => connections.push(update),
2180                CatalogItemType::Source => sources.push(update),
2181                CatalogItemType::Table => tables.push(update),
2182                CatalogItemType::View
2183                | CatalogItemType::MaterializedView
2184                | CatalogItemType::Index => derived_items.push(update),
2185                CatalogItemType::Sink => sinks.push(update),
2186                CatalogItemType::ContinualTask => continual_tasks.push(update),
2187            }
2188        }
2189
2190        // For some groups, the items in them can depend on each other and can be `ALTER`ed so that
2191        // an item ends up depending on an item with a greater ID. Thus we need to perform
2192        // topological sort for these groups.
2193        sort_items_topological(&mut connections);
2194        sort_items_topological(&mut derived_items);
2195
2196        // Other groups we can simply sort by ID.
2197        for group in [
2198            &mut types,
2199            &mut funcs,
2200            &mut secrets,
2201            &mut sources,
2202            &mut tables,
2203            &mut sinks,
2204            &mut continual_tasks,
2205        ] {
2206            group.sort_by_key(|(item, _, _)| item.id);
2207        }
2208
2209        iter::empty()
2210            .chain(types)
2211            .chain(funcs)
2212            .chain(secrets)
2213            .chain(connections)
2214            .chain(sources)
2215            .chain(tables)
2216            .chain(derived_items)
2217            .chain(sinks)
2218            .chain(continual_tasks)
2219            .collect()
2220    }
2221
2222    let item_retractions = sort_item_updates(item_retractions);
2223    let item_additions = sort_item_updates(item_additions);
2224
2225    /// Sort temporary item updates by dependency.
2226    ///
2227    /// The logic of this function should match [`sort_item_updates`].
2228    fn sort_temp_item_updates(
2229        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2230    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2231        // Partition items into groups s.t. each item in one group has a predefined order with all
2232        // items in other groups. For example, all sinks are ordered greater than all tables.
2233        let mut types = Vec::new();
2234        // N.B. Functions can depend on system tables, but not user tables.
2235        let mut funcs = Vec::new();
2236        let mut secrets = Vec::new();
2237        let mut connections = Vec::new();
2238        let mut sources = Vec::new();
2239        let mut tables = Vec::new();
2240        let mut derived_items = Vec::new();
2241        let mut sinks = Vec::new();
2242        let mut continual_tasks = Vec::new();
2243
2244        for update in temp_item_updates {
2245            match update.0.item_type() {
2246                CatalogItemType::Type => types.push(update),
2247                CatalogItemType::Func => funcs.push(update),
2248                CatalogItemType::Secret => secrets.push(update),
2249                CatalogItemType::Connection => connections.push(update),
2250                CatalogItemType::Source => sources.push(update),
2251                CatalogItemType::Table => tables.push(update),
2252                CatalogItemType::View
2253                | CatalogItemType::MaterializedView
2254                | CatalogItemType::Index => derived_items.push(update),
2255                CatalogItemType::Sink => sinks.push(update),
2256                CatalogItemType::ContinualTask => continual_tasks.push(update),
2257            }
2258        }
2259
2260        // Within each group, sort by ID.
2261        for group in [
2262            &mut types,
2263            &mut funcs,
2264            &mut secrets,
2265            &mut connections,
2266            &mut sources,
2267            &mut tables,
2268            &mut derived_items,
2269            &mut sinks,
2270            &mut continual_tasks,
2271        ] {
2272            group.sort_by_key(|(item, _, _)| item.id);
2273        }
2274
2275        iter::empty()
2276            .chain(types)
2277            .chain(funcs)
2278            .chain(secrets)
2279            .chain(connections)
2280            .chain(sources)
2281            .chain(tables)
2282            .chain(derived_items)
2283            .chain(sinks)
2284            .chain(continual_tasks)
2285            .collect()
2286    }
2287    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2288    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2289
2290    /// Merge sorted temporary and non-temp items.
2291    fn merge_item_updates(
2292        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2293        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2294    ) -> Vec<StateUpdate> {
2295        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2296
2297        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2298            (item_updates.front(), temp_item_updates.front())
2299        {
2300            if item.id < temp_item.id {
2301                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2302                state_updates.push(StateUpdate {
2303                    kind: StateUpdateKind::Item(item),
2304                    ts,
2305                    diff,
2306                });
2307            } else if item.id > temp_item.id {
2308                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2309                state_updates.push(StateUpdate {
2310                    kind: StateUpdateKind::TemporaryItem(temp_item),
2311                    ts,
2312                    diff,
2313                });
2314            } else {
2315                unreachable!(
2316                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2317                );
2318            }
2319        }
2320
2321        while let Some((item, ts, diff)) = item_updates.pop_front() {
2322            state_updates.push(StateUpdate {
2323                kind: StateUpdateKind::Item(item),
2324                ts,
2325                diff,
2326            });
2327        }
2328
2329        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2330            state_updates.push(StateUpdate {
2331                kind: StateUpdateKind::TemporaryItem(temp_item),
2332                ts,
2333                diff,
2334            });
2335        }
2336
2337        state_updates
2338    }
2339    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2340    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2341
2342    // Put everything back together.
2343    iter::empty()
2344        // All retractions must be reversed.
2345        .chain(post_item_retractions.into_iter().rev())
2346        .chain(item_retractions.into_iter().rev())
2347        .chain(builtin_index_retractions.into_iter().rev())
2348        .chain(cluster_retractions.into_iter().rev())
2349        .chain(other_builtin_retractions.into_iter().rev())
2350        .chain(pre_cluster_retractions.into_iter().rev())
2351        .chain(pre_cluster_additions)
2352        .chain(other_builtin_additions)
2353        .chain(cluster_additions)
2354        .chain(builtin_index_additions)
2355        .chain(item_additions)
2356        .chain(post_item_additions)
2357        .collect()
2358}
2359
2360/// Groups of updates of certain types are applied in batches to improve
2361/// performance. A constraint is that updates must be applied in order. This
2362/// process is modeled as a state machine that batches then applies groups of
2363/// updates.
2364enum ApplyState {
2365    /// Additions of builtin views.
2366    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2367    /// Item updates that aren't builtin view additions.
2368    ///
2369    /// This contains all updates whose application requires calling
2370    /// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2371    /// flags.
2372    Items(Vec<StateUpdate>),
2373    /// All other updates.
2374    Updates(Vec<StateUpdate>),
2375}
2376
2377impl ApplyState {
2378    fn new(update: StateUpdate) -> Self {
2379        use StateUpdateKind::*;
2380        match &update.kind {
2381            SystemObjectMapping(som)
2382                if som.description.object_type == CatalogItemType::View
2383                    && update.diff == StateDiff::Addition =>
2384            {
2385                let view_addition = lookup_builtin_view_addition(som.clone());
2386                Self::BuiltinViewAdditions(vec![view_addition])
2387            }
2388
2389            IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2390                Self::Items(vec![update])
2391            }
2392
2393            Role(_)
2394            | RoleAuth(_)
2395            | Database(_)
2396            | Schema(_)
2397            | DefaultPrivilege(_)
2398            | SystemPrivilege(_)
2399            | SystemConfiguration(_)
2400            | Cluster(_)
2401            | NetworkPolicy(_)
2402            | ClusterReplica(_)
2403            | SourceReferences(_)
2404            | Comment(_)
2405            | AuditLog(_)
2406            | StorageCollectionMetadata(_)
2407            | UnfinalizedShard(_) => Self::Updates(vec![update]),
2408        }
2409    }
2410
2411    /// Apply all updates that have been batched in `self`.
2412    ///
2413    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2414    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2415    /// details.
2416    async fn apply(
2417        self,
2418        state: &mut CatalogState,
2419        retractions: &mut InProgressRetractions,
2420        local_expression_cache: &mut LocalExpressionCache,
2421    ) -> (
2422        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2423        Vec<ParsedStateUpdate>,
2424    ) {
2425        match self {
2426            Self::BuiltinViewAdditions(builtin_view_additions) => {
2427                let restore = state.system_configuration.clone();
2428                state.system_configuration.enable_for_item_parsing();
2429                let builtin_table_updates = CatalogState::parse_builtin_views(
2430                    state,
2431                    builtin_view_additions,
2432                    retractions,
2433                    local_expression_cache,
2434                )
2435                .await;
2436                state.system_configuration = restore;
2437                (builtin_table_updates, Vec::new())
2438            }
2439            Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2440                state
2441                    .apply_updates_inner(updates, retractions, local_expression_cache)
2442                    .expect("corrupt catalog")
2443            }),
2444            Self::Updates(updates) => state
2445                .apply_updates_inner(updates, retractions, local_expression_cache)
2446                .expect("corrupt catalog"),
2447        }
2448    }
2449
2450    async fn step(
2451        self,
2452        next: Self,
2453        state: &mut CatalogState,
2454        retractions: &mut InProgressRetractions,
2455        local_expression_cache: &mut LocalExpressionCache,
2456    ) -> (
2457        Self,
2458        (
2459            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2460            Vec<ParsedStateUpdate>,
2461        ),
2462    ) {
2463        match (self, next) {
2464            (
2465                Self::BuiltinViewAdditions(mut builtin_view_additions),
2466                Self::BuiltinViewAdditions(next_builtin_view_additions),
2467            ) => {
2468                // Continue batching builtin view additions.
2469                builtin_view_additions.extend(next_builtin_view_additions);
2470                (
2471                    Self::BuiltinViewAdditions(builtin_view_additions),
2472                    (Vec::new(), Vec::new()),
2473                )
2474            }
2475            (Self::Items(mut updates), Self::Items(next_updates)) => {
2476                // Continue batching item updates.
2477                updates.extend(next_updates);
2478                (Self::Items(updates), (Vec::new(), Vec::new()))
2479            }
2480            (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2481                // Continue batching updates.
2482                updates.extend(next_updates);
2483                (Self::Updates(updates), (Vec::new(), Vec::new()))
2484            }
2485            (apply_state, next_apply_state) => {
2486                // Apply the current batch and start batching new apply state.
2487                let updates = apply_state
2488                    .apply(state, retractions, local_expression_cache)
2489                    .await;
2490                (next_apply_state, updates)
2491            }
2492        }
2493    }
2494}
2495
2496/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2497/// generally IDs.
2498///
2499/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2500fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2501where
2502    K: Ord + Clone + Debug,
2503    V: PartialEq + Debug,
2504{
2505    match diff {
2506        StateDiff::Retraction => {
2507            let prev = map.remove(key);
2508            assert_eq!(
2509                prev,
2510                Some(value),
2511                "retraction does not match existing value: {key:?}"
2512            );
2513        }
2514        StateDiff::Addition => {
2515            let prev = map.insert(key.clone(), value);
2516            assert_eq!(
2517                prev, None,
2518                "values must be explicitly retracted before inserting a new value: {key:?}"
2519            );
2520        }
2521    }
2522}
2523
2524/// Helper method to update catalog state, that may need to be updated from a previously retracted
2525/// object.
2526fn apply_with_update<K, V, D>(
2527    map: &mut BTreeMap<K, V>,
2528    durable: D,
2529    key_fn: impl FnOnce(&D) -> K,
2530    diff: StateDiff,
2531    retractions: &mut BTreeMap<D::Key, V>,
2532) where
2533    K: Ord,
2534    V: UpdateFrom<D> + PartialEq + Debug,
2535    D: DurableType,
2536    D::Key: Ord,
2537{
2538    match diff {
2539        StateDiff::Retraction => {
2540            let mem_key = key_fn(&durable);
2541            let value = map
2542                .remove(&mem_key)
2543                .expect("retraction does not match existing value: {key:?}");
2544            let durable_key = durable.into_key_value().0;
2545            retractions.insert(durable_key, value);
2546        }
2547        StateDiff::Addition => {
2548            let mem_key = key_fn(&durable);
2549            let durable_key = durable.key();
2550            let value = match retractions.remove(&durable_key) {
2551                Some(mut retraction) => {
2552                    retraction.update_from(durable);
2553                    retraction
2554                }
2555                None => durable.into(),
2556            };
2557            let prev = map.insert(mem_key, value);
2558            assert_eq!(
2559                prev, None,
2560                "values must be explicitly retracted before inserting a new value"
2561            );
2562        }
2563    }
2564}
2565
2566/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2567fn lookup_builtin_view_addition(
2568    mapping: SystemObjectMapping,
2569) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2570    let (_, builtin) = BUILTIN_LOOKUP
2571        .get(&mapping.description)
2572        .expect("missing builtin view");
2573    let Builtin::View(view) = builtin else {
2574        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2575    };
2576
2577    (
2578        view,
2579        mapping.unique_identifier.catalog_id,
2580        mapping.unique_identifier.global_id,
2581    )
2582}