Skip to main content

mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35    Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::{index_sql, sort_topological};
67
68/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
69/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
70/// results in applying a retraction for that object followed by applying an addition for that
71/// object. When applying those additions it can be extremely expensive to re-build that
72/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
73/// retractions, so it can be used during additions.
74///
75/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
76/// objects that maintain denormalized state.
77// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
78// the update step is a no-op for certain types.
79#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81    roles: BTreeMap<RoleKey, Role>,
82    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83    databases: BTreeMap<DatabaseKey, Database>,
84    schemas: BTreeMap<SchemaKey, Schema>,
85    clusters: BTreeMap<ClusterKey, Cluster>,
86    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87    items: BTreeMap<ItemKey, CatalogEntry>,
88    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
95    ///
96    /// Returns builtin table updates corresponding to the changes to catalog state.
97    #[must_use]
98    #[instrument]
99    pub(crate) async fn apply_updates(
100        &mut self,
101        updates: Vec<StateUpdate>,
102        local_expression_cache: &mut LocalExpressionCache,
103    ) -> (
104        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
105        Vec<ParsedStateUpdate>,
106    ) {
107        let mut builtin_table_updates = Vec::with_capacity(updates.len());
108        let mut catalog_updates = Vec::with_capacity(updates.len());
109
110        // First, consolidate updates. The code that applies parsed state
111        // updates _requires_ that the given updates are consolidated. There
112        // must be at most one addition and/or one retraction for a given item,
113        // as identified by that items ID type.
114        let updates = Self::consolidate_updates(updates);
115
116        // Apply updates in groups, according to their timestamps.
117        let mut groups: Vec<Vec<_>> = Vec::new();
118        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
119            // Bring the updates into the pseudo-topological order that we need
120            // for updating our in-memory state and generating builtin table
121            // updates.
122            let updates = sort_updates(updates.collect());
123            groups.push(updates);
124        }
125
126        for updates in groups {
127            let mut apply_state = ApplyState::Updates(Vec::new());
128            let mut retractions = InProgressRetractions::default();
129
130            for update in updates {
131                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
132                    .step(
133                        ApplyState::new(update),
134                        self,
135                        &mut retractions,
136                        local_expression_cache,
137                    )
138                    .await;
139                apply_state = next_apply_state;
140                builtin_table_updates.extend(builtin_table_update);
141                catalog_updates.extend(catalog_update);
142            }
143
144            // Apply remaining state.
145            let (builtin_table_update, catalog_update) = apply_state
146                .apply(self, &mut retractions, local_expression_cache)
147                .await;
148            builtin_table_updates.extend(builtin_table_update);
149            catalog_updates.extend(catalog_update);
150        }
151
152        (builtin_table_updates, catalog_updates)
153    }
154
155    /// It can happen that the sequencing logic creates "fluctuating" updates
156    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
157    /// for a table, there will be a retraction of the original table state,
158    /// then an addition for the same table but stripped of some of the roles
159    /// and access things, and then a retraction for that intermediate table
160    /// state. By consolidating, the intermediate state addition/retraction will
161    /// cancel out and we'll only see the retraction for the original state.
162    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
163        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
164            .into_iter()
165            .map(|update| (update.kind, update.ts, update.diff.into()))
166            .collect_vec();
167
168        consolidate_updates(&mut updates);
169
170        updates
171            .into_iter()
172            .map(|(kind, ts, diff)| StateUpdate {
173                kind,
174                ts,
175                diff: diff
176                    .try_into()
177                    .expect("catalog state cannot have diff other than -1 or 1"),
178            })
179            .collect_vec()
180    }
181
182    #[instrument(level = "debug")]
183    fn apply_updates_inner(
184        &mut self,
185        updates: Vec<StateUpdate>,
186        retractions: &mut InProgressRetractions,
187        local_expression_cache: &mut LocalExpressionCache,
188    ) -> Result<
189        (
190            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
191            Vec<ParsedStateUpdate>,
192        ),
193        CatalogError,
194    > {
195        soft_assert_no_log!(
196            updates.iter().map(|update| update.ts).all_equal(),
197            "all timestamps should be equal: {updates:?}"
198        );
199
200        let mut update_system_config = false;
201
202        let mut builtin_table_updates = Vec::with_capacity(updates.len());
203        let mut catalog_updates = Vec::new();
204
205        for state_update in updates {
206            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
207                update_system_config = true;
208            }
209
210            match state_update.diff {
211                StateDiff::Retraction => {
212                    // We want the parsed catalog updates to match the state of
213                    // the catalog _before_ applying a retraction. So that we can
214                    // still have useful in-memory state to work with.
215                    if let Some(update) =
216                        parsed_state_updates::parse_state_update(self, state_update.clone())
217                    {
218                        catalog_updates.push(update);
219                    }
220
221                    // We want the builtin table retraction to match the state of the catalog
222                    // before applying the update.
223                    builtin_table_updates.extend(self.generate_builtin_table_update(
224                        state_update.kind.clone(),
225                        state_update.diff,
226                    ));
227                    self.apply_update(
228                        state_update.kind,
229                        state_update.diff,
230                        retractions,
231                        local_expression_cache,
232                    )?;
233                }
234                StateDiff::Addition => {
235                    self.apply_update(
236                        state_update.kind.clone(),
237                        state_update.diff,
238                        retractions,
239                        local_expression_cache,
240                    )?;
241                    // We want the builtin table addition to match the state of
242                    // the catalog after applying the update. So that we already
243                    // have useful in-memory state to work with.
244                    builtin_table_updates.extend(self.generate_builtin_table_update(
245                        state_update.kind.clone(),
246                        state_update.diff,
247                    ));
248
249                    // We want the parsed catalog updates to match the state of
250                    // the catalog _after_ applying an addition.
251                    if let Some(update) =
252                        parsed_state_updates::parse_state_update(self, state_update.clone())
253                    {
254                        catalog_updates.push(update);
255                    }
256                }
257            }
258        }
259
260        if update_system_config {
261            self.system_configuration.dyncfg_updates();
262        }
263
264        Ok((builtin_table_updates, catalog_updates))
265    }
266
267    #[instrument(level = "debug")]
268    fn apply_update(
269        &mut self,
270        kind: StateUpdateKind,
271        diff: StateDiff,
272        retractions: &mut InProgressRetractions,
273        local_expression_cache: &mut LocalExpressionCache,
274    ) -> Result<(), CatalogError> {
275        match kind {
276            StateUpdateKind::Role(role) => {
277                self.apply_role_update(role, diff, retractions);
278            }
279            StateUpdateKind::RoleAuth(role_auth) => {
280                self.apply_role_auth_update(role_auth, diff, retractions);
281            }
282            StateUpdateKind::Database(database) => {
283                self.apply_database_update(database, diff, retractions);
284            }
285            StateUpdateKind::Schema(schema) => {
286                self.apply_schema_update(schema, diff, retractions);
287            }
288            StateUpdateKind::DefaultPrivilege(default_privilege) => {
289                self.apply_default_privilege_update(default_privilege, diff, retractions);
290            }
291            StateUpdateKind::SystemPrivilege(system_privilege) => {
292                self.apply_system_privilege_update(system_privilege, diff, retractions);
293            }
294            StateUpdateKind::SystemConfiguration(system_configuration) => {
295                self.apply_system_configuration_update(system_configuration, diff, retractions);
296            }
297            StateUpdateKind::Cluster(cluster) => {
298                self.apply_cluster_update(cluster, diff, retractions);
299            }
300            StateUpdateKind::NetworkPolicy(network_policy) => {
301                self.apply_network_policy_update(network_policy, diff, retractions);
302            }
303            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304                self.apply_introspection_source_index_update(
305                    introspection_source_index,
306                    diff,
307                    retractions,
308                );
309            }
310            StateUpdateKind::ClusterReplica(cluster_replica) => {
311                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
312            }
313            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
314                self.apply_system_object_mapping_update(
315                    system_object_mapping,
316                    diff,
317                    retractions,
318                    local_expression_cache,
319                );
320            }
321            StateUpdateKind::TemporaryItem(item) => {
322                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
323            }
324            StateUpdateKind::Item(item) => {
325                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
326            }
327            StateUpdateKind::Comment(comment) => {
328                self.apply_comment_update(comment, diff, retractions);
329            }
330            StateUpdateKind::SourceReferences(source_reference) => {
331                self.apply_source_references_update(source_reference, diff, retractions);
332            }
333            StateUpdateKind::AuditLog(_audit_log) => {
334                // Audit logs are not stored in-memory.
335            }
336            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
337                self.apply_storage_collection_metadata_update(
338                    storage_collection_metadata,
339                    diff,
340                    retractions,
341                );
342            }
343            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
344                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
345            }
346        }
347
348        Ok(())
349    }
350
351    #[instrument(level = "debug")]
352    fn apply_role_auth_update(
353        &mut self,
354        role_auth: mz_catalog::durable::RoleAuth,
355        diff: StateDiff,
356        retractions: &mut InProgressRetractions,
357    ) {
358        apply_with_update(
359            &mut self.role_auth_by_id,
360            role_auth,
361            |role_auth| role_auth.role_id,
362            diff,
363            &mut retractions.role_auths,
364        );
365    }
366
367    #[instrument(level = "debug")]
368    fn apply_role_update(
369        &mut self,
370        role: mz_catalog::durable::Role,
371        diff: StateDiff,
372        retractions: &mut InProgressRetractions,
373    ) {
374        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
375        apply_with_update(
376            &mut self.roles_by_id,
377            role,
378            |role| role.id,
379            diff,
380            &mut retractions.roles,
381        );
382    }
383
384    #[instrument(level = "debug")]
385    fn apply_database_update(
386        &mut self,
387        database: mz_catalog::durable::Database,
388        diff: StateDiff,
389        retractions: &mut InProgressRetractions,
390    ) {
391        apply_inverted_lookup(
392            &mut self.database_by_name,
393            &database.name,
394            database.id,
395            diff,
396        );
397        apply_with_update(
398            &mut self.database_by_id,
399            database,
400            |database| database.id,
401            diff,
402            &mut retractions.databases,
403        );
404    }
405
406    #[instrument(level = "debug")]
407    fn apply_schema_update(
408        &mut self,
409        schema: mz_catalog::durable::Schema,
410        diff: StateDiff,
411        retractions: &mut InProgressRetractions,
412    ) {
413        match &schema.database_id {
414            Some(database_id) => {
415                let db = self
416                    .database_by_id
417                    .get_mut(database_id)
418                    .expect("catalog out of sync");
419                apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
420                apply_with_update(
421                    &mut db.schemas_by_id,
422                    schema,
423                    |schema| schema.id,
424                    diff,
425                    &mut retractions.schemas,
426                );
427            }
428            None => {
429                apply_inverted_lookup(
430                    &mut self.ambient_schemas_by_name,
431                    &schema.name,
432                    schema.id,
433                    diff,
434                );
435                apply_with_update(
436                    &mut self.ambient_schemas_by_id,
437                    schema,
438                    |schema| schema.id,
439                    diff,
440                    &mut retractions.schemas,
441                );
442            }
443        }
444    }
445
446    #[instrument(level = "debug")]
447    fn apply_default_privilege_update(
448        &mut self,
449        default_privilege: mz_catalog::durable::DefaultPrivilege,
450        diff: StateDiff,
451        _retractions: &mut InProgressRetractions,
452    ) {
453        match diff {
454            StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
455                .grant(default_privilege.object, default_privilege.acl_item),
456            StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
457                .revoke(&default_privilege.object, &default_privilege.acl_item),
458        }
459    }
460
461    #[instrument(level = "debug")]
462    fn apply_system_privilege_update(
463        &mut self,
464        system_privilege: MzAclItem,
465        diff: StateDiff,
466        _retractions: &mut InProgressRetractions,
467    ) {
468        match diff {
469            StateDiff::Addition => {
470                Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
471            }
472            StateDiff::Retraction => {
473                Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
474            }
475        }
476    }
477
478    #[instrument(level = "debug")]
479    fn apply_system_configuration_update(
480        &mut self,
481        system_configuration: mz_catalog::durable::SystemConfiguration,
482        diff: StateDiff,
483        _retractions: &mut InProgressRetractions,
484    ) {
485        let res = match diff {
486            StateDiff::Addition => self.insert_system_configuration(
487                &system_configuration.name,
488                VarInput::Flat(&system_configuration.value),
489            ),
490            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
491        };
492        match res {
493            Ok(_) => (),
494            // When system variables are deleted, nothing deletes them from the underlying
495            // durable catalog, which isn't great. Still, we need to be able to ignore
496            // unknown variables.
497            Err(Error {
498                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
499            }) => {
500                warn!(%name, "unknown system parameter from catalog storage");
501            }
502            Err(e) => panic!("unable to update system variable: {e:?}"),
503        }
504    }
505
506    #[instrument(level = "debug")]
507    fn apply_cluster_update(
508        &mut self,
509        cluster: mz_catalog::durable::Cluster,
510        diff: StateDiff,
511        retractions: &mut InProgressRetractions,
512    ) {
513        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
514        apply_with_update(
515            &mut self.clusters_by_id,
516            cluster,
517            |cluster| cluster.id,
518            diff,
519            &mut retractions.clusters,
520        );
521    }
522
523    #[instrument(level = "debug")]
524    fn apply_network_policy_update(
525        &mut self,
526        policy: mz_catalog::durable::NetworkPolicy,
527        diff: StateDiff,
528        retractions: &mut InProgressRetractions,
529    ) {
530        apply_inverted_lookup(
531            &mut self.network_policies_by_name,
532            &policy.name,
533            policy.id,
534            diff,
535        );
536        apply_with_update(
537            &mut self.network_policies_by_id,
538            policy,
539            |policy| policy.id,
540            diff,
541            &mut retractions.network_policies,
542        );
543    }
544
545    #[instrument(level = "debug")]
546    fn apply_introspection_source_index_update(
547        &mut self,
548        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
549        diff: StateDiff,
550        retractions: &mut InProgressRetractions,
551    ) {
552        let cluster = self
553            .clusters_by_id
554            .get_mut(&introspection_source_index.cluster_id)
555            .expect("catalog out of sync");
556        let log = BUILTIN_LOG_LOOKUP
557            .get(introspection_source_index.name.as_str())
558            .expect("missing log");
559        apply_inverted_lookup(
560            &mut cluster.log_indexes,
561            &log.variant,
562            introspection_source_index.index_id,
563            diff,
564        );
565
566        match diff {
567            StateDiff::Addition => {
568                if let Some(mut entry) = retractions
569                    .introspection_source_indexes
570                    .remove(&introspection_source_index.item_id)
571                {
572                    // This should only happen during startup as a result of builtin migrations. We
573                    // create a new index item and replace the old one with it.
574                    let (index_name, index) = self.create_introspection_source_index(
575                        introspection_source_index.cluster_id,
576                        log,
577                        introspection_source_index.index_id,
578                    );
579                    assert_eq!(entry.id, introspection_source_index.item_id);
580                    assert_eq!(entry.oid, introspection_source_index.oid);
581                    assert_eq!(entry.name, index_name);
582                    entry.item = index;
583                    self.insert_entry(entry);
584                } else {
585                    self.insert_introspection_source_index(
586                        introspection_source_index.cluster_id,
587                        log,
588                        introspection_source_index.item_id,
589                        introspection_source_index.index_id,
590                        introspection_source_index.oid,
591                    );
592                }
593            }
594            StateDiff::Retraction => {
595                let entry = self.drop_item(introspection_source_index.item_id);
596                retractions
597                    .introspection_source_indexes
598                    .insert(entry.id, entry);
599            }
600        }
601    }
602
603    #[instrument(level = "debug")]
604    fn apply_cluster_replica_update(
605        &mut self,
606        cluster_replica: mz_catalog::durable::ClusterReplica,
607        diff: StateDiff,
608        _retractions: &mut InProgressRetractions,
609    ) {
610        let cluster = self
611            .clusters_by_id
612            .get(&cluster_replica.cluster_id)
613            .expect("catalog out of sync");
614        let azs = cluster.availability_zones();
615        let location = self
616            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
617            .expect("catalog in unexpected state");
618        let cluster = self
619            .clusters_by_id
620            .get_mut(&cluster_replica.cluster_id)
621            .expect("catalog out of sync");
622        apply_inverted_lookup(
623            &mut cluster.replica_id_by_name_,
624            &cluster_replica.name,
625            cluster_replica.replica_id,
626            diff,
627        );
628        match diff {
629            StateDiff::Retraction => {
630                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
631                assert!(
632                    prev.is_some(),
633                    "retraction does not match existing value: {:?}",
634                    cluster_replica.replica_id
635                );
636            }
637            StateDiff::Addition => {
638                let logging = ReplicaLogging {
639                    log_logging: cluster_replica.config.logging.log_logging,
640                    interval: cluster_replica.config.logging.interval,
641                };
642                let config = ReplicaConfig {
643                    location,
644                    compute: ComputeReplicaConfig { logging },
645                };
646                let mem_cluster_replica = ClusterReplica {
647                    name: cluster_replica.name.clone(),
648                    cluster_id: cluster_replica.cluster_id,
649                    replica_id: cluster_replica.replica_id,
650                    config,
651                    owner_id: cluster_replica.owner_id,
652                };
653                let prev = cluster
654                    .replicas_by_id_
655                    .insert(cluster_replica.replica_id, mem_cluster_replica);
656                assert_eq!(
657                    prev, None,
658                    "values must be explicitly retracted before inserting a new value: {:?}",
659                    cluster_replica.replica_id
660                );
661            }
662        }
663    }
664
665    #[instrument(level = "debug")]
666    fn apply_system_object_mapping_update(
667        &mut self,
668        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
669        diff: StateDiff,
670        retractions: &mut InProgressRetractions,
671        local_expression_cache: &mut LocalExpressionCache,
672    ) {
673        let item_id = system_object_mapping.unique_identifier.catalog_id;
674        let global_id = system_object_mapping.unique_identifier.global_id;
675
676        if system_object_mapping.unique_identifier.runtime_alterable() {
677            // Runtime-alterable system objects have real entries in the items
678            // collection and so get handled through the normal `insert_item`
679            // and `drop_item` code paths.
680            return;
681        }
682
683        if let StateDiff::Retraction = diff {
684            let entry = self.drop_item(item_id);
685            retractions.system_object_mappings.insert(item_id, entry);
686            return;
687        }
688
689        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
690            // This implies that we updated the fingerprint for some builtin item. The retraction
691            // was parsed, planned, and optimized using the compiled in definition, not the
692            // definition from a previous version. So we can just stick the old entry back into the
693            // catalog.
694            self.insert_entry(entry);
695            return;
696        }
697
698        let builtin = BUILTIN_LOOKUP
699            .get(&system_object_mapping.description)
700            .expect("missing builtin")
701            .1;
702        let schema_name = builtin.schema();
703        let schema_id = self
704            .ambient_schemas_by_name
705            .get(schema_name)
706            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
707        let name = QualifiedItemName {
708            qualifiers: ItemQualifiers {
709                database_spec: ResolvedDatabaseSpecifier::Ambient,
710                schema_spec: SchemaSpecifier::Id(*schema_id),
711            },
712            item: builtin.name().into(),
713        };
714        match builtin {
715            Builtin::Log(log) => {
716                let mut acl_items = vec![rbac::owner_privilege(
717                    mz_sql::catalog::ObjectType::Source,
718                    MZ_SYSTEM_ROLE_ID,
719                )];
720                acl_items.extend_from_slice(&log.access);
721                self.insert_item(
722                    item_id,
723                    log.oid,
724                    name.clone(),
725                    CatalogItem::Log(Log {
726                        variant: log.variant,
727                        global_id,
728                    }),
729                    MZ_SYSTEM_ROLE_ID,
730                    PrivilegeMap::from_mz_acl_items(acl_items),
731                );
732            }
733
734            Builtin::Table(table) => {
735                let mut acl_items = vec![rbac::owner_privilege(
736                    mz_sql::catalog::ObjectType::Table,
737                    MZ_SYSTEM_ROLE_ID,
738                )];
739                acl_items.extend_from_slice(&table.access);
740
741                self.insert_item(
742                    item_id,
743                    table.oid,
744                    name.clone(),
745                    CatalogItem::Table(Table {
746                        create_sql: None,
747                        desc: VersionedRelationDesc::new(table.desc.clone()),
748                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
749                        conn_id: None,
750                        resolved_ids: ResolvedIds::empty(),
751                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
752                            || {
753                                self.system_config()
754                                    .metrics_retention()
755                                    .try_into()
756                                    .expect("invalid metrics retention")
757                            },
758                        ),
759                        is_retained_metrics_object: table.is_retained_metrics_object,
760                        data_source: TableDataSource::TableWrites {
761                            defaults: vec![Expr::null(); table.desc.arity()],
762                        },
763                    }),
764                    MZ_SYSTEM_ROLE_ID,
765                    PrivilegeMap::from_mz_acl_items(acl_items),
766                );
767            }
768            Builtin::Index(index) => {
769                let custom_logical_compaction_window =
770                    index.is_retained_metrics_object.then(|| {
771                        self.system_config()
772                            .metrics_retention()
773                            .try_into()
774                            .expect("invalid metrics retention")
775                    });
776                // Indexes can't be versioned.
777                let versions = BTreeMap::new();
778
779                let item = self
780                    .parse_item(
781                        global_id,
782                        &index.create_sql(),
783                        &versions,
784                        None,
785                        index.is_retained_metrics_object,
786                        custom_logical_compaction_window,
787                        local_expression_cache,
788                        None,
789                    )
790                    .unwrap_or_else(|e| {
791                        panic!(
792                            "internal error: failed to load bootstrap index:\n\
793                                    {}\n\
794                                    error:\n\
795                                    {:?}\n\n\
796                                    make sure that the schema name is specified in the builtin index's create sql statement.",
797                            index.name, e
798                        )
799                    });
800                let CatalogItem::Index(_) = item else {
801                    panic!(
802                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
803                        index.name
804                    );
805                };
806
807                self.insert_item(
808                    item_id,
809                    index.oid,
810                    name,
811                    item,
812                    MZ_SYSTEM_ROLE_ID,
813                    PrivilegeMap::default(),
814                );
815            }
816            Builtin::View(_) => {
817                // parse_views is responsible for inserting all builtin views.
818                unreachable!("views added elsewhere");
819            }
820
821            // Note: Element types must be loaded before array types.
822            Builtin::Type(typ) => {
823                let typ = self.resolve_builtin_type_references(typ);
824                if let CatalogType::Array { element_reference } = typ.details.typ {
825                    let entry = self.get_entry_mut(&element_reference);
826                    let item_type = match &mut entry.item {
827                        CatalogItem::Type(item_type) => item_type,
828                        _ => unreachable!("types can only reference other types"),
829                    };
830                    item_type.details.array_id = Some(item_id);
831                }
832
833                let schema_id = self.resolve_system_schema(typ.schema);
834
835                self.insert_item(
836                    item_id,
837                    typ.oid,
838                    QualifiedItemName {
839                        qualifiers: ItemQualifiers {
840                            database_spec: ResolvedDatabaseSpecifier::Ambient,
841                            schema_spec: SchemaSpecifier::Id(schema_id),
842                        },
843                        item: typ.name.to_owned(),
844                    },
845                    CatalogItem::Type(Type {
846                        create_sql: None,
847                        global_id,
848                        details: typ.details.clone(),
849                        resolved_ids: ResolvedIds::empty(),
850                    }),
851                    MZ_SYSTEM_ROLE_ID,
852                    PrivilegeMap::from_mz_acl_items(vec![
853                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
854                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
855                    ]),
856                );
857            }
858
859            Builtin::Func(func) => {
860                // This OID is never used. `func` has a `Vec` of implementations and
861                // each implementation has its own OID. Those are the OIDs that are
862                // actually used by the system.
863                let oid = INVALID_OID;
864                self.insert_item(
865                    item_id,
866                    oid,
867                    name.clone(),
868                    CatalogItem::Func(Func {
869                        inner: func.inner,
870                        global_id,
871                    }),
872                    MZ_SYSTEM_ROLE_ID,
873                    PrivilegeMap::default(),
874                );
875            }
876
877            Builtin::Source(coll) => {
878                let mut acl_items = vec![rbac::owner_privilege(
879                    mz_sql::catalog::ObjectType::Source,
880                    MZ_SYSTEM_ROLE_ID,
881                )];
882                acl_items.extend_from_slice(&coll.access);
883
884                self.insert_item(
885                    item_id,
886                    coll.oid,
887                    name.clone(),
888                    CatalogItem::Source(Source {
889                        create_sql: None,
890                        data_source: coll.data_source.clone(),
891                        desc: coll.desc.clone(),
892                        global_id,
893                        timeline: Timeline::EpochMilliseconds,
894                        resolved_ids: ResolvedIds::empty(),
895                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
896                            || {
897                                self.system_config()
898                                    .metrics_retention()
899                                    .try_into()
900                                    .expect("invalid metrics retention")
901                            },
902                        ),
903                        is_retained_metrics_object: coll.is_retained_metrics_object,
904                    }),
905                    MZ_SYSTEM_ROLE_ID,
906                    PrivilegeMap::from_mz_acl_items(acl_items),
907                );
908            }
909            Builtin::ContinualTask(ct) => {
910                let mut acl_items = vec![rbac::owner_privilege(
911                    mz_sql::catalog::ObjectType::Source,
912                    MZ_SYSTEM_ROLE_ID,
913                )];
914                acl_items.extend_from_slice(&ct.access);
915                // Continual Tasks can't be versioned.
916                let versions = BTreeMap::new();
917
918                let item = self
919                    .parse_item(
920                        global_id,
921                        &ct.create_sql(),
922                        &versions,
923                        None,
924                        false,
925                        None,
926                        local_expression_cache,
927                        None,
928                    )
929                    .unwrap_or_else(|e| {
930                        panic!(
931                            "internal error: failed to load bootstrap continual task:\n\
932                                    {}\n\
933                                    error:\n\
934                                    {:?}\n\n\
935                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
936                            ct.name, e
937                        )
938                    });
939                let CatalogItem::ContinualTask(_) = &item else {
940                    panic!(
941                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
942                        ct.name
943                    );
944                };
945
946                self.insert_item(
947                    item_id,
948                    ct.oid,
949                    name,
950                    item,
951                    MZ_SYSTEM_ROLE_ID,
952                    PrivilegeMap::from_mz_acl_items(acl_items),
953                );
954            }
955            Builtin::Connection(connection) => {
956                // Connections can't be versioned.
957                let versions = BTreeMap::new();
958                let mut item = self
959                    .parse_item(
960                        global_id,
961                        connection.sql,
962                        &versions,
963                        None,
964                        false,
965                        None,
966                        local_expression_cache,
967                        None,
968                    )
969                    .unwrap_or_else(|e| {
970                        panic!(
971                            "internal error: failed to load bootstrap connection:\n\
972                                    {}\n\
973                                    error:\n\
974                                    {:?}\n\n\
975                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
976                            connection.name, e
977                        )
978                    });
979                let CatalogItem::Connection(_) = &mut item else {
980                    panic!(
981                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
982                        connection.name
983                    );
984                };
985
986                let mut acl_items = vec![rbac::owner_privilege(
987                    mz_sql::catalog::ObjectType::Connection,
988                    connection.owner_id.clone(),
989                )];
990                acl_items.extend_from_slice(connection.access);
991
992                self.insert_item(
993                    item_id,
994                    connection.oid,
995                    name.clone(),
996                    item,
997                    connection.owner_id.clone(),
998                    PrivilegeMap::from_mz_acl_items(acl_items),
999                );
1000            }
1001        }
1002    }
1003
1004    #[instrument(level = "debug")]
1005    fn apply_temporary_item_update(
1006        &mut self,
1007        temporary_item: TemporaryItem,
1008        diff: StateDiff,
1009        retractions: &mut InProgressRetractions,
1010        local_expression_cache: &mut LocalExpressionCache,
1011    ) {
1012        match diff {
1013            StateDiff::Addition => {
1014                let TemporaryItem {
1015                    id,
1016                    oid,
1017                    global_id,
1018                    schema_id,
1019                    name,
1020                    conn_id,
1021                    create_sql,
1022                    owner_id,
1023                    privileges,
1024                    extra_versions,
1025                } = temporary_item;
1026                // Lazily create the temporary schema if it doesn't exist yet.
1027                // We need the conn_id to create the schema, and it should always be Some for temp items.
1028                let temp_conn_id = conn_id
1029                    .as_ref()
1030                    .expect("temporary items must have a connection id");
1031                if !self.temporary_schemas.contains_key(temp_conn_id) {
1032                    self.create_temporary_schema(temp_conn_id, owner_id)
1033                        .expect("failed to create temporary schema");
1034                }
1035                let schema = self.find_temp_schema(&schema_id);
1036                let name = QualifiedItemName {
1037                    qualifiers: ItemQualifiers {
1038                        database_spec: schema.database().clone(),
1039                        schema_spec: schema.id().clone(),
1040                    },
1041                    item: name.clone(),
1042                };
1043
1044                let entry = match retractions.temp_items.remove(&id) {
1045                    Some(mut retraction) => {
1046                        assert_eq!(retraction.id, id);
1047
1048                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1049                        // item. This is a performance optimization and not needed for correctness.
1050                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1051                        // is still the same as the trait.
1052                        if retraction.create_sql() != create_sql {
1053                            let mut catalog_item = self
1054                                .deserialize_item(
1055                                    global_id,
1056                                    &create_sql,
1057                                    &extra_versions,
1058                                    local_expression_cache,
1059                                    Some(retraction.item),
1060                                )
1061                                .unwrap_or_else(|e| {
1062                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1063                                });
1064                            // Have to patch up the item because parsing doesn't
1065                            // take into account temporary schemas/conn_id.
1066                            // NOTE(aljoscha): I don't like how we're patching
1067                            // this in here, but it's but one of the ways in
1068                            // which temporary items are a bit weird. So, here
1069                            // we are ...
1070                            catalog_item.set_conn_id(conn_id);
1071                            retraction.item = catalog_item;
1072                        }
1073
1074                        retraction.id = id;
1075                        retraction.oid = oid;
1076                        retraction.name = name;
1077                        retraction.owner_id = owner_id;
1078                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1079                        retraction
1080                    }
1081                    None => {
1082                        let mut catalog_item = self
1083                            .deserialize_item(
1084                                global_id,
1085                                &create_sql,
1086                                &extra_versions,
1087                                local_expression_cache,
1088                                None,
1089                            )
1090                            .unwrap_or_else(|e| {
1091                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1092                            });
1093
1094                        // Have to patch up the item because parsing doesn't
1095                        // take into account temporary schemas/conn_id.
1096                        // NOTE(aljoscha): I don't like how we're patching this
1097                        // in here, but it's but one of the ways in which
1098                        // temporary items are a bit weird. So, here we are ...
1099                        catalog_item.set_conn_id(conn_id);
1100
1101                        CatalogEntry {
1102                            item: catalog_item,
1103                            referenced_by: Vec::new(),
1104                            used_by: Vec::new(),
1105                            id,
1106                            oid,
1107                            name,
1108                            owner_id,
1109                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1110                        }
1111                    }
1112                };
1113                self.insert_entry(entry);
1114            }
1115            StateDiff::Retraction => {
1116                let entry = self.drop_item(temporary_item.id);
1117                retractions.temp_items.insert(temporary_item.id, entry);
1118            }
1119        }
1120    }
1121
1122    #[instrument(level = "debug")]
1123    fn apply_item_update(
1124        &mut self,
1125        item: mz_catalog::durable::Item,
1126        diff: StateDiff,
1127        retractions: &mut InProgressRetractions,
1128        local_expression_cache: &mut LocalExpressionCache,
1129    ) -> Result<(), CatalogError> {
1130        match diff {
1131            StateDiff::Addition => {
1132                let key = item.key();
1133                let mz_catalog::durable::Item {
1134                    id,
1135                    oid,
1136                    global_id,
1137                    schema_id,
1138                    name,
1139                    create_sql,
1140                    owner_id,
1141                    privileges,
1142                    extra_versions,
1143                } = item;
1144                let schema = self.find_non_temp_schema(&schema_id);
1145                let name = QualifiedItemName {
1146                    qualifiers: ItemQualifiers {
1147                        database_spec: schema.database().clone(),
1148                        schema_spec: schema.id().clone(),
1149                    },
1150                    item: name.clone(),
1151                };
1152                let entry = match retractions.items.remove(&key) {
1153                    Some(retraction) => {
1154                        assert_eq!(retraction.id, item.id);
1155
1156                        let item = self
1157                            .deserialize_item(
1158                                global_id,
1159                                &create_sql,
1160                                &extra_versions,
1161                                local_expression_cache,
1162                                Some(retraction.item),
1163                            )
1164                            .unwrap_or_else(|e| {
1165                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1166                            });
1167
1168                        CatalogEntry {
1169                            item,
1170                            id,
1171                            oid,
1172                            name,
1173                            owner_id,
1174                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1175                            referenced_by: retraction.referenced_by,
1176                            used_by: retraction.used_by,
1177                        }
1178                    }
1179                    None => {
1180                        let catalog_item = self
1181                            .deserialize_item(
1182                                global_id,
1183                                &create_sql,
1184                                &extra_versions,
1185                                local_expression_cache,
1186                                None,
1187                            )
1188                            .unwrap_or_else(|e| {
1189                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1190                            });
1191                        CatalogEntry {
1192                            item: catalog_item,
1193                            referenced_by: Vec::new(),
1194                            used_by: Vec::new(),
1195                            id,
1196                            oid,
1197                            name,
1198                            owner_id,
1199                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1200                        }
1201                    }
1202                };
1203
1204                self.insert_entry(entry);
1205            }
1206            StateDiff::Retraction => {
1207                let entry = self.drop_item(item.id);
1208                let key = item.into_key_value().0;
1209                retractions.items.insert(key, entry);
1210            }
1211        }
1212        Ok(())
1213    }
1214
1215    #[instrument(level = "debug")]
1216    fn apply_comment_update(
1217        &mut self,
1218        comment: mz_catalog::durable::Comment,
1219        diff: StateDiff,
1220        _retractions: &mut InProgressRetractions,
1221    ) {
1222        match diff {
1223            StateDiff::Addition => {
1224                let prev = Arc::make_mut(&mut self.comments).update_comment(
1225                    comment.object_id,
1226                    comment.sub_component,
1227                    Some(comment.comment),
1228                );
1229                assert_eq!(
1230                    prev, None,
1231                    "values must be explicitly retracted before inserting a new value"
1232                );
1233            }
1234            StateDiff::Retraction => {
1235                let prev = Arc::make_mut(&mut self.comments).update_comment(
1236                    comment.object_id,
1237                    comment.sub_component,
1238                    None,
1239                );
1240                assert_eq!(
1241                    prev,
1242                    Some(comment.comment),
1243                    "retraction does not match existing value: ({:?}, {:?})",
1244                    comment.object_id,
1245                    comment.sub_component,
1246                );
1247            }
1248        }
1249    }
1250
1251    #[instrument(level = "debug")]
1252    fn apply_source_references_update(
1253        &mut self,
1254        source_references: mz_catalog::durable::SourceReferences,
1255        diff: StateDiff,
1256        _retractions: &mut InProgressRetractions,
1257    ) {
1258        match diff {
1259            StateDiff::Addition => {
1260                let prev = self
1261                    .source_references
1262                    .insert(source_references.source_id, source_references.into());
1263                assert!(
1264                    prev.is_none(),
1265                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1266                );
1267            }
1268            StateDiff::Retraction => {
1269                let prev = self.source_references.remove(&source_references.source_id);
1270                assert!(
1271                    prev.is_some(),
1272                    "retraction for a non-existent existing value: {source_references:?}"
1273                );
1274            }
1275        }
1276    }
1277
1278    #[instrument(level = "debug")]
1279    fn apply_storage_collection_metadata_update(
1280        &mut self,
1281        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1282        diff: StateDiff,
1283        _retractions: &mut InProgressRetractions,
1284    ) {
1285        apply_inverted_lookup(
1286            &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1287            &storage_collection_metadata.id,
1288            storage_collection_metadata.shard,
1289            diff,
1290        );
1291    }
1292
1293    #[instrument(level = "debug")]
1294    fn apply_unfinalized_shard_update(
1295        &mut self,
1296        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1297        diff: StateDiff,
1298        _retractions: &mut InProgressRetractions,
1299    ) {
1300        match diff {
1301            StateDiff::Addition => {
1302                let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1303                    .unfinalized_shards
1304                    .insert(unfinalized_shard.shard);
1305                assert!(
1306                    newly_inserted,
1307                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1308                );
1309            }
1310            StateDiff::Retraction => {
1311                let removed = Arc::make_mut(&mut self.storage_metadata)
1312                    .unfinalized_shards
1313                    .remove(&unfinalized_shard.shard);
1314                assert!(
1315                    removed,
1316                    "retraction does not match existing value: {unfinalized_shard:?}"
1317                );
1318            }
1319        }
1320    }
1321
1322    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1323    /// durable catalog.
1324    #[instrument]
1325    pub(crate) fn generate_builtin_table_updates(
1326        &self,
1327        updates: Vec<StateUpdate>,
1328    ) -> Vec<BuiltinTableUpdate> {
1329        let mut builtin_table_updates = Vec::new();
1330        for StateUpdate { kind, ts: _, diff } in updates {
1331            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1332            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1333            builtin_table_updates.extend(builtin_table_update);
1334        }
1335        builtin_table_updates
1336    }
1337
1338    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1339    /// durable catalog.
1340    #[instrument(level = "debug")]
1341    pub(crate) fn generate_builtin_table_update(
1342        &self,
1343        kind: StateUpdateKind,
1344        diff: StateDiff,
1345    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1346        let diff = diff.into();
1347        match kind {
1348            StateUpdateKind::Role(role) => {
1349                let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1350                for group_id in role.membership.map.keys() {
1351                    builtin_table_updates
1352                        .push(self.pack_role_members_update(*group_id, role.id, diff))
1353                }
1354                builtin_table_updates
1355            }
1356            StateUpdateKind::RoleAuth(role_auth) => {
1357                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1358            }
1359            StateUpdateKind::Database(database) => {
1360                vec![self.pack_database_update(&database.id, diff)]
1361            }
1362            StateUpdateKind::Schema(schema) => {
1363                let db_spec = schema.database_id.into();
1364                vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1365            }
1366            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1367                vec![self.pack_default_privileges_update(
1368                    &default_privilege.object,
1369                    &default_privilege.acl_item.grantee,
1370                    &default_privilege.acl_item.acl_mode,
1371                    diff,
1372                )]
1373            }
1374            StateUpdateKind::SystemPrivilege(system_privilege) => {
1375                vec![self.pack_system_privileges_update(system_privilege, diff)]
1376            }
1377            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1378            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1379            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1380                self.pack_item_update(introspection_source_index.item_id, diff)
1381            }
1382            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1383                cluster_replica.cluster_id,
1384                &cluster_replica.name,
1385                diff,
1386            ),
1387            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1388                // Runtime-alterable system objects have real entries in the
1389                // items collection and so get handled through the normal
1390                // `StateUpdateKind::Item`.`
1391                if !system_object_mapping.unique_identifier.runtime_alterable() {
1392                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1393                } else {
1394                    vec![]
1395                }
1396            }
1397            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1398            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1399            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1400                comment.object_id,
1401                comment.sub_component,
1402                &comment.comment,
1403                diff,
1404            )],
1405            StateUpdateKind::SourceReferences(source_references) => {
1406                self.pack_source_references_update(&source_references, diff)
1407            }
1408            StateUpdateKind::AuditLog(audit_log) => {
1409                vec![
1410                    self.pack_audit_log_update(&audit_log.event, diff)
1411                        .expect("could not pack audit log update"),
1412                ]
1413            }
1414            StateUpdateKind::NetworkPolicy(policy) => self
1415                .pack_network_policy_update(&policy.id, diff)
1416                .expect("could not pack audit log update"),
1417            StateUpdateKind::StorageCollectionMetadata(_)
1418            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1419        }
1420    }
1421
1422    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1423        self.entry_by_id
1424            .get_mut(id)
1425            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1426    }
1427
1428    fn get_schema_mut(
1429        &mut self,
1430        database_spec: &ResolvedDatabaseSpecifier,
1431        schema_spec: &SchemaSpecifier,
1432        conn_id: &ConnectionId,
1433    ) -> &mut Schema {
1434        // Keep in sync with `get_schemas`
1435        match (database_spec, schema_spec) {
1436            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1437                .temporary_schemas
1438                .get_mut(conn_id)
1439                .expect("catalog out of sync"),
1440            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1441                .ambient_schemas_by_id
1442                .get_mut(id)
1443                .expect("catalog out of sync"),
1444            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1445                .database_by_id
1446                .get_mut(database_id)
1447                .expect("catalog out of sync")
1448                .schemas_by_id
1449                .get_mut(schema_id)
1450                .expect("catalog out of sync"),
1451            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1452                unreachable!("temporary schemas are in the ambient database")
1453            }
1454        }
1455    }
1456
1457    /// Install builtin views to the catalog. This is its own function so that views can be
1458    /// optimized in parallel.
1459    ///
1460    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1461    /// problems by sniffing out specific errors and then retrying once those dependencies are
1462    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1463    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1464    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1465    /// in awaiting with nothing retriggering the attempt.
1466    #[instrument(name = "catalog::parse_views")]
1467    async fn parse_builtin_views(
1468        state: &mut CatalogState,
1469        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1470        retractions: &mut InProgressRetractions,
1471        local_expression_cache: &mut LocalExpressionCache,
1472    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1473        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1474        let (updates, additions): (Vec<_>, Vec<_>) =
1475            builtin_views
1476                .into_iter()
1477                .partition_map(|(view, item_id, gid)| {
1478                    match retractions.system_object_mappings.remove(&item_id) {
1479                        Some(entry) => Either::Left(entry),
1480                        None => Either::Right((view, item_id, gid)),
1481                    }
1482                });
1483
1484        for entry in updates {
1485            // This implies that we updated the fingerprint for some builtin view. The retraction
1486            // was parsed, planned, and optimized using the compiled in definition, not the
1487            // definition from a previous version. So we can just stick the old entry back into the
1488            // catalog.
1489            let item_id = entry.id();
1490            state.insert_entry(entry);
1491            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1492        }
1493
1494        let mut handles = Vec::new();
1495        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1496            BTreeMap::new();
1497        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1498        // Some errors are due to the implementation of casts or SQL functions that depend on some
1499        // view. Instead of figuring out the exact view dependency, delay these until the end.
1500        let mut awaiting_all = Vec::new();
1501        // Completed views, needed to avoid race conditions.
1502        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1503        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1504
1505        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1506        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1507            .into_iter()
1508            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1509            .collect();
1510        let item_ids: Vec<_> = views.keys().copied().collect();
1511
1512        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1513        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1514            if handles.is_empty() && ready.is_empty() {
1515                // Enqueue the views that were waiting for all the others.
1516                ready.extend(awaiting_all.drain(..));
1517            }
1518
1519            // Spawn tasks for all ready views.
1520            if !ready.is_empty() {
1521                let spawn_state = Arc::new(state.clone());
1522                while let Some(id) = ready.pop_front() {
1523                    let (view, global_id) = views.get(&id).expect("must exist");
1524                    let global_id = *global_id;
1525                    let create_sql = view.create_sql();
1526                    // Views can't be versioned.
1527                    let versions = BTreeMap::new();
1528
1529                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1530                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1531                    let task_state = Arc::clone(&spawn_state);
1532                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1533                    let handle = mz_ore::task::spawn_blocking(
1534                        || "parse view",
1535                        move || {
1536                            span.in_scope(|| {
1537                                let res = task_state.parse_item_inner(
1538                                    global_id,
1539                                    &create_sql,
1540                                    &versions,
1541                                    None,
1542                                    false,
1543                                    None,
1544                                    cached_expr,
1545                                    None,
1546                                );
1547                                (id, global_id, res)
1548                            })
1549                        },
1550                    );
1551                    handles.push(handle);
1552                }
1553            }
1554
1555            // Wait for a view to be ready.
1556            let (selected, _idx, remaining) = future::select_all(handles).await;
1557            handles = remaining;
1558            let (id, global_id, res) = selected;
1559            let mut insert_cached_expr = |cached_expr| {
1560                if let Some(cached_expr) = cached_expr {
1561                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1562                }
1563            };
1564            match res {
1565                Ok((item, uncached_expr)) => {
1566                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1567                        local_expression_cache.insert_uncached_expression(
1568                            global_id,
1569                            uncached_expr,
1570                            optimizer_features,
1571                        );
1572                    }
1573                    // Add item to catalog.
1574                    let (view, _gid) = views.remove(&id).expect("must exist");
1575                    let schema_id = state
1576                        .ambient_schemas_by_name
1577                        .get(view.schema)
1578                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1579                    let qname = QualifiedItemName {
1580                        qualifiers: ItemQualifiers {
1581                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1582                            schema_spec: SchemaSpecifier::Id(*schema_id),
1583                        },
1584                        item: view.name.into(),
1585                    };
1586                    let mut acl_items = vec![rbac::owner_privilege(
1587                        mz_sql::catalog::ObjectType::View,
1588                        MZ_SYSTEM_ROLE_ID,
1589                    )];
1590                    acl_items.extend_from_slice(&view.access);
1591
1592                    state.insert_item(
1593                        id,
1594                        view.oid,
1595                        qname,
1596                        item,
1597                        MZ_SYSTEM_ROLE_ID,
1598                        PrivilegeMap::from_mz_acl_items(acl_items),
1599                    );
1600
1601                    // Enqueue any items waiting on this dependency.
1602                    let mut resolved_dependent_items = Vec::new();
1603                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1604                        resolved_dependent_items.extend(dependent_items);
1605                    }
1606                    let entry = state.get_entry(&id);
1607                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1608                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1609                        resolved_dependent_items.extend(dependent_items);
1610                    }
1611                    ready.extend(resolved_dependent_items);
1612
1613                    completed_ids.insert(id);
1614                    completed_names.insert(full_name);
1615                }
1616                // If we were missing a dependency, wait for it to be added.
1617                Err((
1618                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1619                    cached_expr,
1620                )) => {
1621                    insert_cached_expr(cached_expr);
1622                    if completed_ids.contains(&missing_dep) {
1623                        ready.push_back(id);
1624                    } else {
1625                        awaiting_id_dependencies
1626                            .entry(missing_dep)
1627                            .or_default()
1628                            .push(id);
1629                    }
1630                }
1631                // If we were missing a dependency, wait for it to be added.
1632                Err((
1633                    AdapterError::PlanError(plan::PlanError::Catalog(
1634                        SqlCatalogError::UnknownItem(missing_dep),
1635                    )),
1636                    cached_expr,
1637                )) => {
1638                    insert_cached_expr(cached_expr);
1639                    match CatalogItemId::from_str(&missing_dep) {
1640                        Ok(missing_dep) => {
1641                            if completed_ids.contains(&missing_dep) {
1642                                ready.push_back(id);
1643                            } else {
1644                                awaiting_id_dependencies
1645                                    .entry(missing_dep)
1646                                    .or_default()
1647                                    .push(id);
1648                            }
1649                        }
1650                        Err(_) => {
1651                            if completed_names.contains(&missing_dep) {
1652                                ready.push_back(id);
1653                            } else {
1654                                awaiting_name_dependencies
1655                                    .entry(missing_dep)
1656                                    .or_default()
1657                                    .push(id);
1658                            }
1659                        }
1660                    }
1661                }
1662                Err((
1663                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1664                    cached_expr,
1665                )) => {
1666                    insert_cached_expr(cached_expr);
1667                    awaiting_all.push(id);
1668                }
1669                Err((e, _)) => {
1670                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1671                    panic!(
1672                        "internal error: failed to load bootstrap view:\n\
1673                            {name}\n\
1674                            error:\n\
1675                            {e:?}\n\n\
1676                            Make sure that the schema name is specified in the builtin view's create sql statement.
1677                            ",
1678                        name = bad_view.name,
1679                    )
1680                }
1681            }
1682        }
1683
1684        assert!(awaiting_id_dependencies.is_empty());
1685        assert!(
1686            awaiting_name_dependencies.is_empty(),
1687            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1688        );
1689        assert!(awaiting_all.is_empty());
1690        assert!(views.is_empty());
1691
1692        // Generate a builtin table update for all the new views.
1693        builtin_table_updates.extend(
1694            item_ids
1695                .into_iter()
1696                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1697        );
1698
1699        builtin_table_updates
1700    }
1701
1702    /// Associates a name, `CatalogItemId`, and entry.
1703    fn insert_entry(&mut self, entry: CatalogEntry) {
1704        if !entry.id.is_system() {
1705            if let Some(cluster_id) = entry.item.cluster_id() {
1706                self.clusters_by_id
1707                    .get_mut(&cluster_id)
1708                    .expect("catalog out of sync")
1709                    .bound_objects
1710                    .insert(entry.id);
1711            };
1712        }
1713
1714        for u in entry.references().items() {
1715            match self.entry_by_id.get_mut(u) {
1716                Some(metadata) => metadata.referenced_by.push(entry.id()),
1717                None => panic!(
1718                    "Catalog: missing dependent catalog item {} while installing {}",
1719                    &u,
1720                    self.resolve_full_name(entry.name(), entry.conn_id())
1721                ),
1722            }
1723        }
1724        for u in entry.uses() {
1725            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1726            // present.
1727            if u == entry.id() {
1728                continue;
1729            }
1730            match self.entry_by_id.get_mut(&u) {
1731                Some(metadata) => metadata.used_by.push(entry.id()),
1732                None => panic!(
1733                    "Catalog: missing dependent catalog item {} while installing {}",
1734                    &u,
1735                    self.resolve_full_name(entry.name(), entry.conn_id())
1736                ),
1737            }
1738        }
1739        for gid in entry.item.global_ids() {
1740            self.entry_by_global_id.insert(gid, entry.id());
1741        }
1742        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1743        // Lazily create the temporary schema if this is a temporary item and the schema
1744        // doesn't exist yet.
1745        if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1746            && !self.temporary_schemas.contains_key(conn_id)
1747        {
1748            self.create_temporary_schema(conn_id, entry.owner_id)
1749                .expect("failed to create temporary schema");
1750        }
1751        let schema = self.get_schema_mut(
1752            &entry.name().qualifiers.database_spec,
1753            &entry.name().qualifiers.schema_spec,
1754            conn_id,
1755        );
1756
1757        let prev_id = match entry.item() {
1758            CatalogItem::Func(_) => schema
1759                .functions
1760                .insert(entry.name().item.clone(), entry.id()),
1761            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1762            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1763        };
1764
1765        assert!(
1766            prev_id.is_none(),
1767            "builtin name collision on {:?}",
1768            entry.name().item.clone()
1769        );
1770
1771        self.entry_by_id.insert(entry.id(), entry.clone());
1772    }
1773
1774    /// Associates a name, [`CatalogItemId`], and entry.
1775    fn insert_item(
1776        &mut self,
1777        id: CatalogItemId,
1778        oid: u32,
1779        name: QualifiedItemName,
1780        item: CatalogItem,
1781        owner_id: RoleId,
1782        privileges: PrivilegeMap,
1783    ) {
1784        let entry = CatalogEntry {
1785            item,
1786            name,
1787            id,
1788            oid,
1789            used_by: Vec::new(),
1790            referenced_by: Vec::new(),
1791            owner_id,
1792            privileges,
1793        };
1794
1795        self.insert_entry(entry);
1796    }
1797
1798    #[mz_ore::instrument(level = "trace")]
1799    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1800        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1801        for u in metadata.references().items() {
1802            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1803                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1804            }
1805        }
1806        for u in metadata.uses() {
1807            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1808                dep_metadata.used_by.retain(|u| *u != metadata.id())
1809            }
1810        }
1811        for gid in metadata.global_ids() {
1812            self.entry_by_global_id.remove(&gid);
1813        }
1814
1815        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1816        let schema = self.get_schema_mut(
1817            &metadata.name().qualifiers.database_spec,
1818            &metadata.name().qualifiers.schema_spec,
1819            conn_id,
1820        );
1821        if metadata.item_type() == CatalogItemType::Type {
1822            schema
1823                .types
1824                .remove(&metadata.name().item)
1825                .expect("catalog out of sync");
1826        } else {
1827            // Functions would need special handling, but we don't yet support
1828            // dropping functions.
1829            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1830
1831            schema
1832                .items
1833                .remove(&metadata.name().item)
1834                .expect("catalog out of sync");
1835        };
1836
1837        if !id.is_system() {
1838            if let Some(cluster_id) = metadata.item().cluster_id() {
1839                assert!(
1840                    self.clusters_by_id
1841                        .get_mut(&cluster_id)
1842                        .expect("catalog out of sync")
1843                        .bound_objects
1844                        .remove(&id),
1845                    "catalog out of sync"
1846                );
1847            }
1848        }
1849
1850        metadata
1851    }
1852
1853    fn insert_introspection_source_index(
1854        &mut self,
1855        cluster_id: ClusterId,
1856        log: &'static BuiltinLog,
1857        item_id: CatalogItemId,
1858        global_id: GlobalId,
1859        oid: u32,
1860    ) {
1861        let (index_name, index) =
1862            self.create_introspection_source_index(cluster_id, log, global_id);
1863        self.insert_item(
1864            item_id,
1865            oid,
1866            index_name,
1867            index,
1868            MZ_SYSTEM_ROLE_ID,
1869            PrivilegeMap::default(),
1870        );
1871    }
1872
1873    fn create_introspection_source_index(
1874        &self,
1875        cluster_id: ClusterId,
1876        log: &'static BuiltinLog,
1877        global_id: GlobalId,
1878    ) -> (QualifiedItemName, CatalogItem) {
1879        let source_name = FullItemName {
1880            database: RawDatabaseSpecifier::Ambient,
1881            schema: log.schema.into(),
1882            item: log.name.into(),
1883        };
1884        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1885        let mut index_name = QualifiedItemName {
1886            qualifiers: ItemQualifiers {
1887                database_spec: ResolvedDatabaseSpecifier::Ambient,
1888                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1889            },
1890            item: index_name.clone(),
1891        };
1892        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1893        let index_item_name = index_name.item.clone();
1894        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1895        let index = CatalogItem::Index(Index {
1896            global_id,
1897            on: log_global_id,
1898            keys: log
1899                .variant
1900                .index_by()
1901                .into_iter()
1902                .map(MirScalarExpr::column)
1903                .collect(),
1904            create_sql: index_sql(
1905                index_item_name,
1906                cluster_id,
1907                source_name,
1908                &log.variant.desc(),
1909                &log.variant.index_by(),
1910            ),
1911            conn_id: None,
1912            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1913            cluster_id,
1914            is_retained_metrics_object: false,
1915            custom_logical_compaction_window: None,
1916        });
1917        (index_name, index)
1918    }
1919
1920    /// Insert system configuration `name` with `value`.
1921    ///
1922    /// Return a `bool` value indicating whether the configuration was modified
1923    /// by the call.
1924    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1925        Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
1926    }
1927
1928    /// Reset system configuration `name`.
1929    ///
1930    /// Return a `bool` value indicating whether the configuration was modified
1931    /// by the call.
1932    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1933        Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
1934    }
1935}
1936
1937/// Sort [`StateUpdate`]s in dependency order.
1938///
1939/// # Panics
1940///
1941/// This function assumes that all provided `updates` have the same timestamp and will panic
1942/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
1943/// `StateUpdateKinds` are unique.
1944fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1945    fn push_update<T>(
1946        update: T,
1947        diff: StateDiff,
1948        retractions: &mut Vec<T>,
1949        additions: &mut Vec<T>,
1950    ) {
1951        match diff {
1952            StateDiff::Retraction => retractions.push(update),
1953            StateDiff::Addition => additions.push(update),
1954        }
1955    }
1956
1957    soft_assert_no_log!(
1958        updates.iter().map(|update| update.ts).all_equal(),
1959        "all timestamps should be equal: {updates:?}"
1960    );
1961    soft_assert_no_log!(
1962        {
1963            let mut dedup = BTreeSet::new();
1964            updates.iter().all(|update| dedup.insert(&update.kind))
1965        },
1966        "updates should be consolidated: {updates:?}"
1967    );
1968
1969    // Partition updates by type so that we can weave different update types into the right spots.
1970    let mut pre_cluster_retractions = Vec::new();
1971    let mut pre_cluster_additions = Vec::new();
1972    let mut cluster_retractions = Vec::new();
1973    let mut cluster_additions = Vec::new();
1974    let mut builtin_item_updates = Vec::new();
1975    let mut item_retractions = Vec::new();
1976    let mut item_additions = Vec::new();
1977    let mut temp_item_retractions = Vec::new();
1978    let mut temp_item_additions = Vec::new();
1979    let mut post_item_retractions = Vec::new();
1980    let mut post_item_additions = Vec::new();
1981    for update in updates {
1982        let diff = update.diff.clone();
1983        match update.kind {
1984            StateUpdateKind::Role(_)
1985            | StateUpdateKind::RoleAuth(_)
1986            | StateUpdateKind::Database(_)
1987            | StateUpdateKind::Schema(_)
1988            | StateUpdateKind::DefaultPrivilege(_)
1989            | StateUpdateKind::SystemPrivilege(_)
1990            | StateUpdateKind::SystemConfiguration(_)
1991            | StateUpdateKind::NetworkPolicy(_) => push_update(
1992                update,
1993                diff,
1994                &mut pre_cluster_retractions,
1995                &mut pre_cluster_additions,
1996            ),
1997            StateUpdateKind::Cluster(_)
1998            | StateUpdateKind::IntrospectionSourceIndex(_)
1999            | StateUpdateKind::ClusterReplica(_) => push_update(
2000                update,
2001                diff,
2002                &mut cluster_retractions,
2003                &mut cluster_additions,
2004            ),
2005            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2006                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2007            }
2008            StateUpdateKind::TemporaryItem(item) => push_update(
2009                (item, update.ts, update.diff),
2010                diff,
2011                &mut temp_item_retractions,
2012                &mut temp_item_additions,
2013            ),
2014            StateUpdateKind::Item(item) => push_update(
2015                (item, update.ts, update.diff),
2016                diff,
2017                &mut item_retractions,
2018                &mut item_additions,
2019            ),
2020            StateUpdateKind::Comment(_)
2021            | StateUpdateKind::SourceReferences(_)
2022            | StateUpdateKind::AuditLog(_)
2023            | StateUpdateKind::StorageCollectionMetadata(_)
2024            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2025                update,
2026                diff,
2027                &mut post_item_retractions,
2028                &mut post_item_additions,
2029            ),
2030        }
2031    }
2032
2033    // Sort builtin item updates by dependency.
2034    let builtin_item_updates = builtin_item_updates
2035        .into_iter()
2036        .map(|(system_object_mapping, ts, diff)| {
2037            let idx = BUILTIN_LOOKUP
2038                .get(&system_object_mapping.description)
2039                .expect("missing builtin")
2040                .0;
2041            (idx, system_object_mapping, ts, diff)
2042        })
2043        .sorted_by_key(|(idx, _, _, _)| *idx)
2044        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2045
2046    // Further partition builtin item updates.
2047    let mut other_builtin_retractions = Vec::new();
2048    let mut other_builtin_additions = Vec::new();
2049    let mut builtin_index_retractions = Vec::new();
2050    let mut builtin_index_additions = Vec::new();
2051    for (builtin_item_update, ts, diff) in builtin_item_updates {
2052        match &builtin_item_update.description.object_type {
2053            CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
2054                StateUpdate {
2055                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2056                    ts,
2057                    diff,
2058                },
2059                diff,
2060                &mut builtin_index_retractions,
2061                &mut builtin_index_additions,
2062            ),
2063            CatalogItemType::Table
2064            | CatalogItemType::Source
2065            | CatalogItemType::Sink
2066            | CatalogItemType::View
2067            | CatalogItemType::MaterializedView
2068            | CatalogItemType::Type
2069            | CatalogItemType::Func
2070            | CatalogItemType::Secret
2071            | CatalogItemType::Connection => push_update(
2072                StateUpdate {
2073                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2074                    ts,
2075                    diff,
2076                },
2077                diff,
2078                &mut other_builtin_retractions,
2079                &mut other_builtin_additions,
2080            ),
2081        }
2082    }
2083
2084    /// Sort items by their dependencies using topological sort.
2085    ///
2086    /// # Panics
2087    ///
2088    /// This function requires that all provided items have unique item IDs.
2089    fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2090        tracing::debug!(?items, "sorting items by dependencies");
2091
2092        let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2093        let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2094            let statement = mz_sql::parse::parse(&item.0.create_sql)
2095                .expect("valid create_sql")
2096                .into_element()
2097                .ast;
2098            mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2099        };
2100        sort_topological(items, key_fn, dependencies_fn);
2101    }
2102
2103    /// Sort item updates by dependency.
2104    ///
2105    /// First we group items into groups that are totally ordered by dependency. For example, when
2106    /// sorting all items by dependency we know that all tables can come after all sources, because
2107    /// a source can never depend on a table. Second, we sort the items in each group in
2108    /// topological order, or by ID, depending on the type.
2109    ///
2110    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2111    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2112    /// approach would be to investigate each item, discover their exact dependencies, and then
2113    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2114    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2115    /// referred to by name.
2116    ///
2117    /// The logic of this function should match [`sort_temp_item_updates`].
2118    fn sort_item_updates(
2119        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2120    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2121        // Partition items into groups s.t. each item in one group has a predefined order with all
2122        // items in other groups. For example, all sinks are ordered greater than all tables.
2123        let mut types = Vec::new();
2124        // N.B. Functions can depend on system tables, but not user tables.
2125        // TODO(udf): This will change when UDFs are supported.
2126        let mut funcs = Vec::new();
2127        let mut secrets = Vec::new();
2128        let mut connections = Vec::new();
2129        let mut sources = Vec::new();
2130        let mut tables = Vec::new();
2131        let mut derived_items = Vec::new();
2132        let mut sinks = Vec::new();
2133        let mut continual_tasks = Vec::new();
2134
2135        for update in item_updates {
2136            match update.0.item_type() {
2137                CatalogItemType::Type => types.push(update),
2138                CatalogItemType::Func => funcs.push(update),
2139                CatalogItemType::Secret => secrets.push(update),
2140                CatalogItemType::Connection => connections.push(update),
2141                CatalogItemType::Source => sources.push(update),
2142                CatalogItemType::Table => tables.push(update),
2143                CatalogItemType::View
2144                | CatalogItemType::MaterializedView
2145                | CatalogItemType::Index => derived_items.push(update),
2146                CatalogItemType::Sink => sinks.push(update),
2147                CatalogItemType::ContinualTask => continual_tasks.push(update),
2148            }
2149        }
2150
2151        // For some groups, the items in them can depend on each other and can be `ALTER`ed so that
2152        // an item ends up depending on an item with a greater ID. Thus we need to perform
2153        // topological sort for these groups.
2154        sort_items_topological(&mut connections);
2155        sort_items_topological(&mut derived_items);
2156
2157        // Other groups we can simply sort by ID.
2158        for group in [
2159            &mut types,
2160            &mut funcs,
2161            &mut secrets,
2162            &mut sources,
2163            &mut tables,
2164            &mut sinks,
2165            &mut continual_tasks,
2166        ] {
2167            group.sort_by_key(|(item, _, _)| item.id);
2168        }
2169
2170        iter::empty()
2171            .chain(types)
2172            .chain(funcs)
2173            .chain(secrets)
2174            .chain(connections)
2175            .chain(sources)
2176            .chain(tables)
2177            .chain(derived_items)
2178            .chain(sinks)
2179            .chain(continual_tasks)
2180            .collect()
2181    }
2182
2183    let item_retractions = sort_item_updates(item_retractions);
2184    let item_additions = sort_item_updates(item_additions);
2185
2186    /// Sort temporary item updates by dependency.
2187    ///
2188    /// The logic of this function should match [`sort_item_updates`].
2189    fn sort_temp_item_updates(
2190        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2191    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2192        // Partition items into groups s.t. each item in one group has a predefined order with all
2193        // items in other groups. For example, all sinks are ordered greater than all tables.
2194        let mut types = Vec::new();
2195        // N.B. Functions can depend on system tables, but not user tables.
2196        let mut funcs = Vec::new();
2197        let mut secrets = Vec::new();
2198        let mut connections = Vec::new();
2199        let mut sources = Vec::new();
2200        let mut tables = Vec::new();
2201        let mut derived_items = Vec::new();
2202        let mut sinks = Vec::new();
2203        let mut continual_tasks = Vec::new();
2204
2205        for update in temp_item_updates {
2206            match update.0.item_type() {
2207                CatalogItemType::Type => types.push(update),
2208                CatalogItemType::Func => funcs.push(update),
2209                CatalogItemType::Secret => secrets.push(update),
2210                CatalogItemType::Connection => connections.push(update),
2211                CatalogItemType::Source => sources.push(update),
2212                CatalogItemType::Table => tables.push(update),
2213                CatalogItemType::View
2214                | CatalogItemType::MaterializedView
2215                | CatalogItemType::Index => derived_items.push(update),
2216                CatalogItemType::Sink => sinks.push(update),
2217                CatalogItemType::ContinualTask => continual_tasks.push(update),
2218            }
2219        }
2220
2221        // Within each group, sort by ID.
2222        for group in [
2223            &mut types,
2224            &mut funcs,
2225            &mut secrets,
2226            &mut connections,
2227            &mut sources,
2228            &mut tables,
2229            &mut derived_items,
2230            &mut sinks,
2231            &mut continual_tasks,
2232        ] {
2233            group.sort_by_key(|(item, _, _)| item.id);
2234        }
2235
2236        iter::empty()
2237            .chain(types)
2238            .chain(funcs)
2239            .chain(secrets)
2240            .chain(connections)
2241            .chain(sources)
2242            .chain(tables)
2243            .chain(derived_items)
2244            .chain(sinks)
2245            .chain(continual_tasks)
2246            .collect()
2247    }
2248    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2249    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2250
2251    /// Merge sorted temporary and non-temp items.
2252    fn merge_item_updates(
2253        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2254        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2255    ) -> Vec<StateUpdate> {
2256        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2257
2258        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2259            (item_updates.front(), temp_item_updates.front())
2260        {
2261            if item.id < temp_item.id {
2262                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2263                state_updates.push(StateUpdate {
2264                    kind: StateUpdateKind::Item(item),
2265                    ts,
2266                    diff,
2267                });
2268            } else if item.id > temp_item.id {
2269                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2270                state_updates.push(StateUpdate {
2271                    kind: StateUpdateKind::TemporaryItem(temp_item),
2272                    ts,
2273                    diff,
2274                });
2275            } else {
2276                unreachable!(
2277                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2278                );
2279            }
2280        }
2281
2282        while let Some((item, ts, diff)) = item_updates.pop_front() {
2283            state_updates.push(StateUpdate {
2284                kind: StateUpdateKind::Item(item),
2285                ts,
2286                diff,
2287            });
2288        }
2289
2290        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2291            state_updates.push(StateUpdate {
2292                kind: StateUpdateKind::TemporaryItem(temp_item),
2293                ts,
2294                diff,
2295            });
2296        }
2297
2298        state_updates
2299    }
2300    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2301    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2302
2303    // Put everything back together.
2304    iter::empty()
2305        // All retractions must be reversed.
2306        .chain(post_item_retractions.into_iter().rev())
2307        .chain(item_retractions.into_iter().rev())
2308        .chain(builtin_index_retractions.into_iter().rev())
2309        .chain(cluster_retractions.into_iter().rev())
2310        .chain(other_builtin_retractions.into_iter().rev())
2311        .chain(pre_cluster_retractions.into_iter().rev())
2312        .chain(pre_cluster_additions)
2313        .chain(other_builtin_additions)
2314        .chain(cluster_additions)
2315        .chain(builtin_index_additions)
2316        .chain(item_additions)
2317        .chain(post_item_additions)
2318        .collect()
2319}
2320
2321/// Groups of updates of certain types are applied in batches to improve
2322/// performance. A constraint is that updates must be applied in order. This
2323/// process is modeled as a state machine that batches then applies groups of
2324/// updates.
2325enum ApplyState {
2326    /// Additions of builtin views.
2327    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2328    /// Item updates that aren't builtin view additions.
2329    ///
2330    /// This contains all updates whose application requires calling
2331    /// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2332    /// flags.
2333    Items(Vec<StateUpdate>),
2334    /// All other updates.
2335    Updates(Vec<StateUpdate>),
2336}
2337
2338impl ApplyState {
2339    fn new(update: StateUpdate) -> Self {
2340        use StateUpdateKind::*;
2341        match &update.kind {
2342            SystemObjectMapping(som)
2343                if som.description.object_type == CatalogItemType::View
2344                    && update.diff == StateDiff::Addition =>
2345            {
2346                let view_addition = lookup_builtin_view_addition(som.clone());
2347                Self::BuiltinViewAdditions(vec![view_addition])
2348            }
2349
2350            IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2351                Self::Items(vec![update])
2352            }
2353
2354            Role(_)
2355            | RoleAuth(_)
2356            | Database(_)
2357            | Schema(_)
2358            | DefaultPrivilege(_)
2359            | SystemPrivilege(_)
2360            | SystemConfiguration(_)
2361            | Cluster(_)
2362            | NetworkPolicy(_)
2363            | ClusterReplica(_)
2364            | SourceReferences(_)
2365            | Comment(_)
2366            | AuditLog(_)
2367            | StorageCollectionMetadata(_)
2368            | UnfinalizedShard(_) => Self::Updates(vec![update]),
2369        }
2370    }
2371
2372    /// Apply all updates that have been batched in `self`.
2373    ///
2374    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2375    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2376    /// details.
2377    async fn apply(
2378        self,
2379        state: &mut CatalogState,
2380        retractions: &mut InProgressRetractions,
2381        local_expression_cache: &mut LocalExpressionCache,
2382    ) -> (
2383        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2384        Vec<ParsedStateUpdate>,
2385    ) {
2386        match self {
2387            Self::BuiltinViewAdditions(builtin_view_additions) => {
2388                let restore = Arc::clone(&state.system_configuration);
2389                Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2390                let builtin_table_updates = CatalogState::parse_builtin_views(
2391                    state,
2392                    builtin_view_additions,
2393                    retractions,
2394                    local_expression_cache,
2395                )
2396                .await;
2397                state.system_configuration = restore;
2398                (builtin_table_updates, Vec::new())
2399            }
2400            Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2401                state
2402                    .apply_updates_inner(updates, retractions, local_expression_cache)
2403                    .expect("corrupt catalog")
2404            }),
2405            Self::Updates(updates) => state
2406                .apply_updates_inner(updates, retractions, local_expression_cache)
2407                .expect("corrupt catalog"),
2408        }
2409    }
2410
2411    async fn step(
2412        self,
2413        next: Self,
2414        state: &mut CatalogState,
2415        retractions: &mut InProgressRetractions,
2416        local_expression_cache: &mut LocalExpressionCache,
2417    ) -> (
2418        Self,
2419        (
2420            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2421            Vec<ParsedStateUpdate>,
2422        ),
2423    ) {
2424        match (self, next) {
2425            (
2426                Self::BuiltinViewAdditions(mut builtin_view_additions),
2427                Self::BuiltinViewAdditions(next_builtin_view_additions),
2428            ) => {
2429                // Continue batching builtin view additions.
2430                builtin_view_additions.extend(next_builtin_view_additions);
2431                (
2432                    Self::BuiltinViewAdditions(builtin_view_additions),
2433                    (Vec::new(), Vec::new()),
2434                )
2435            }
2436            (Self::Items(mut updates), Self::Items(next_updates)) => {
2437                // Continue batching item updates.
2438                updates.extend(next_updates);
2439                (Self::Items(updates), (Vec::new(), Vec::new()))
2440            }
2441            (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2442                // Continue batching updates.
2443                updates.extend(next_updates);
2444                (Self::Updates(updates), (Vec::new(), Vec::new()))
2445            }
2446            (apply_state, next_apply_state) => {
2447                // Apply the current batch and start batching new apply state.
2448                let updates = apply_state
2449                    .apply(state, retractions, local_expression_cache)
2450                    .await;
2451                (next_apply_state, updates)
2452            }
2453        }
2454    }
2455}
2456
2457/// Trait abstracting over map operations needed by [`apply_inverted_lookup`] and
2458/// [`apply_with_update`]. Both [`BTreeMap`] and [`imbl::OrdMap`] implement this.
2459trait MutableMap<K, V> {
2460    fn insert(&mut self, key: K, value: V) -> Option<V>;
2461    fn remove(&mut self, key: &K) -> Option<V>;
2462}
2463
2464impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2465    fn insert(&mut self, key: K, value: V) -> Option<V> {
2466        BTreeMap::insert(self, key, value)
2467    }
2468    fn remove(&mut self, key: &K) -> Option<V> {
2469        BTreeMap::remove(self, key)
2470    }
2471}
2472
2473impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2474    fn insert(&mut self, key: K, value: V) -> Option<V> {
2475        imbl::OrdMap::insert(self, key, value)
2476    }
2477    fn remove(&mut self, key: &K) -> Option<V> {
2478        imbl::OrdMap::remove(self, key)
2479    }
2480}
2481
2482/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2483/// generally IDs.
2484///
2485/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2486fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2487where
2488    K: Ord + Clone + Debug,
2489    V: PartialEq + Debug,
2490{
2491    match diff {
2492        StateDiff::Retraction => {
2493            let prev = map.remove(key);
2494            assert_eq!(
2495                prev,
2496                Some(value),
2497                "retraction does not match existing value: {key:?}"
2498            );
2499        }
2500        StateDiff::Addition => {
2501            let prev = map.insert(key.clone(), value);
2502            assert_eq!(
2503                prev, None,
2504                "values must be explicitly retracted before inserting a new value: {key:?}"
2505            );
2506        }
2507    }
2508}
2509
2510/// Helper method to update catalog state, that may need to be updated from a previously retracted
2511/// object.
2512fn apply_with_update<K, V, D>(
2513    map: &mut impl MutableMap<K, V>,
2514    durable: D,
2515    key_fn: impl FnOnce(&D) -> K,
2516    diff: StateDiff,
2517    retractions: &mut BTreeMap<D::Key, V>,
2518) where
2519    K: Ord,
2520    V: UpdateFrom<D> + PartialEq + Debug,
2521    D: DurableType,
2522    D::Key: Ord,
2523{
2524    match diff {
2525        StateDiff::Retraction => {
2526            let mem_key = key_fn(&durable);
2527            let value = map
2528                .remove(&mem_key)
2529                .expect("retraction does not match existing value: {key:?}");
2530            let durable_key = durable.into_key_value().0;
2531            retractions.insert(durable_key, value);
2532        }
2533        StateDiff::Addition => {
2534            let mem_key = key_fn(&durable);
2535            let durable_key = durable.key();
2536            let value = match retractions.remove(&durable_key) {
2537                Some(mut retraction) => {
2538                    retraction.update_from(durable);
2539                    retraction
2540                }
2541                None => durable.into(),
2542            };
2543            let prev = map.insert(mem_key, value);
2544            assert_eq!(
2545                prev, None,
2546                "values must be explicitly retracted before inserting a new value"
2547            );
2548        }
2549    }
2550}
2551
2552/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2553fn lookup_builtin_view_addition(
2554    mapping: SystemObjectMapping,
2555) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2556    let (_, builtin) = BUILTIN_LOOKUP
2557        .get(&mapping.description)
2558        .expect("missing builtin view");
2559    let Builtin::View(view) = builtin else {
2560        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2561    };
2562
2563    (
2564        view,
2565        mapping.unique_identifier.catalog_id,
2566        mapping.unique_identifier.global_id,
2567    )
2568}