Skip to main content

mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35    Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::{index_sql, sort_topological};
67
68/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
69/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
70/// results in applying a retraction for that object followed by applying an addition for that
71/// object. When applying those additions it can be extremely expensive to re-build that
72/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
73/// retractions, so it can be used during additions.
74///
75/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
76/// objects that maintain denormalized state.
77// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
78// the update step is a no-op for certain types.
79#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81    roles: BTreeMap<RoleKey, Role>,
82    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83    databases: BTreeMap<DatabaseKey, Database>,
84    schemas: BTreeMap<SchemaKey, Schema>,
85    clusters: BTreeMap<ClusterKey, Cluster>,
86    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87    items: BTreeMap<ItemKey, CatalogEntry>,
88    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
95    ///
96    /// Returns builtin table updates corresponding to the changes to catalog state.
97    #[must_use]
98    #[instrument]
99    pub(crate) async fn apply_updates(
100        &mut self,
101        updates: Vec<StateUpdate>,
102        local_expression_cache: &mut LocalExpressionCache,
103    ) -> (
104        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
105        Vec<ParsedStateUpdate>,
106    ) {
107        let mut builtin_table_updates = Vec::with_capacity(updates.len());
108        let mut catalog_updates = Vec::with_capacity(updates.len());
109
110        // First, consolidate updates. The code that applies parsed state
111        // updates _requires_ that the given updates are consolidated. There
112        // must be at most one addition and/or one retraction for a given item,
113        // as identified by that items ID type.
114        let updates = Self::consolidate_updates(updates);
115
116        // Apply updates in groups, according to their timestamps.
117        let mut groups: Vec<Vec<_>> = Vec::new();
118        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
119            // Bring the updates into the pseudo-topological order that we need
120            // for updating our in-memory state and generating builtin table
121            // updates.
122            let updates = sort_updates(updates.collect());
123            groups.push(updates);
124        }
125
126        for updates in groups {
127            let mut apply_state = ApplyState::Updates(Vec::new());
128            let mut retractions = InProgressRetractions::default();
129
130            for update in updates {
131                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
132                    .step(
133                        ApplyState::new(update),
134                        self,
135                        &mut retractions,
136                        local_expression_cache,
137                    )
138                    .await;
139                apply_state = next_apply_state;
140                builtin_table_updates.extend(builtin_table_update);
141                catalog_updates.extend(catalog_update);
142            }
143
144            // Apply remaining state.
145            let (builtin_table_update, catalog_update) = apply_state
146                .apply(self, &mut retractions, local_expression_cache)
147                .await;
148            builtin_table_updates.extend(builtin_table_update);
149            catalog_updates.extend(catalog_update);
150        }
151
152        (builtin_table_updates, catalog_updates)
153    }
154
155    /// It can happen that the sequencing logic creates "fluctuating" updates
156    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
157    /// for a table, there will be a retraction of the original table state,
158    /// then an addition for the same table but stripped of some of the roles
159    /// and access things, and then a retraction for that intermediate table
160    /// state. By consolidating, the intermediate state addition/retraction will
161    /// cancel out and we'll only see the retraction for the original state.
162    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
163        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
164            .into_iter()
165            .map(|update| (update.kind, update.ts, update.diff.into()))
166            .collect_vec();
167
168        consolidate_updates(&mut updates);
169
170        updates
171            .into_iter()
172            .map(|(kind, ts, diff)| StateUpdate {
173                kind,
174                ts,
175                diff: diff
176                    .try_into()
177                    .expect("catalog state cannot have diff other than -1 or 1"),
178            })
179            .collect_vec()
180    }
181
182    #[instrument(level = "debug")]
183    fn apply_updates_inner(
184        &mut self,
185        updates: Vec<StateUpdate>,
186        retractions: &mut InProgressRetractions,
187        local_expression_cache: &mut LocalExpressionCache,
188    ) -> Result<
189        (
190            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
191            Vec<ParsedStateUpdate>,
192        ),
193        CatalogError,
194    > {
195        soft_assert_no_log!(
196            updates.iter().map(|update| update.ts).all_equal(),
197            "all timestamps should be equal: {updates:?}"
198        );
199
200        let mut update_system_config = false;
201
202        let mut builtin_table_updates = Vec::with_capacity(updates.len());
203        let mut catalog_updates = Vec::new();
204
205        for state_update in updates {
206            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
207                update_system_config = true;
208            }
209
210            match state_update.diff {
211                StateDiff::Retraction => {
212                    // We want the parsed catalog updates to match the state of
213                    // the catalog _before_ applying a retraction. So that we can
214                    // still have useful in-memory state to work with.
215                    if let Some(update) =
216                        parsed_state_updates::parse_state_update(self, state_update.clone())
217                    {
218                        catalog_updates.push(update);
219                    }
220
221                    // We want the builtin table retraction to match the state of the catalog
222                    // before applying the update.
223                    builtin_table_updates.extend(self.generate_builtin_table_update(
224                        state_update.kind.clone(),
225                        state_update.diff,
226                    ));
227                    self.apply_update(
228                        state_update.kind,
229                        state_update.diff,
230                        retractions,
231                        local_expression_cache,
232                    )?;
233                }
234                StateDiff::Addition => {
235                    self.apply_update(
236                        state_update.kind.clone(),
237                        state_update.diff,
238                        retractions,
239                        local_expression_cache,
240                    )?;
241                    // We want the builtin table addition to match the state of
242                    // the catalog after applying the update. So that we already
243                    // have useful in-memory state to work with.
244                    builtin_table_updates.extend(self.generate_builtin_table_update(
245                        state_update.kind.clone(),
246                        state_update.diff,
247                    ));
248
249                    // We want the parsed catalog updates to match the state of
250                    // the catalog _after_ applying an addition.
251                    if let Some(update) =
252                        parsed_state_updates::parse_state_update(self, state_update.clone())
253                    {
254                        catalog_updates.push(update);
255                    }
256                }
257            }
258        }
259
260        if update_system_config {
261            self.system_configuration.dyncfg_updates();
262        }
263
264        Ok((builtin_table_updates, catalog_updates))
265    }
266
267    #[instrument(level = "debug")]
268    fn apply_update(
269        &mut self,
270        kind: StateUpdateKind,
271        diff: StateDiff,
272        retractions: &mut InProgressRetractions,
273        local_expression_cache: &mut LocalExpressionCache,
274    ) -> Result<(), CatalogError> {
275        match kind {
276            StateUpdateKind::Role(role) => {
277                self.apply_role_update(role, diff, retractions);
278            }
279            StateUpdateKind::RoleAuth(role_auth) => {
280                self.apply_role_auth_update(role_auth, diff, retractions);
281            }
282            StateUpdateKind::Database(database) => {
283                self.apply_database_update(database, diff, retractions);
284            }
285            StateUpdateKind::Schema(schema) => {
286                self.apply_schema_update(schema, diff, retractions);
287            }
288            StateUpdateKind::DefaultPrivilege(default_privilege) => {
289                self.apply_default_privilege_update(default_privilege, diff, retractions);
290            }
291            StateUpdateKind::SystemPrivilege(system_privilege) => {
292                self.apply_system_privilege_update(system_privilege, diff, retractions);
293            }
294            StateUpdateKind::SystemConfiguration(system_configuration) => {
295                self.apply_system_configuration_update(system_configuration, diff, retractions);
296            }
297            StateUpdateKind::Cluster(cluster) => {
298                self.apply_cluster_update(cluster, diff, retractions);
299            }
300            StateUpdateKind::NetworkPolicy(network_policy) => {
301                self.apply_network_policy_update(network_policy, diff, retractions);
302            }
303            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304                self.apply_introspection_source_index_update(
305                    introspection_source_index,
306                    diff,
307                    retractions,
308                );
309            }
310            StateUpdateKind::ClusterReplica(cluster_replica) => {
311                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
312            }
313            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
314                self.apply_system_object_mapping_update(
315                    system_object_mapping,
316                    diff,
317                    retractions,
318                    local_expression_cache,
319                );
320            }
321            StateUpdateKind::TemporaryItem(item) => {
322                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
323            }
324            StateUpdateKind::Item(item) => {
325                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
326            }
327            StateUpdateKind::Comment(comment) => {
328                self.apply_comment_update(comment, diff, retractions);
329            }
330            StateUpdateKind::SourceReferences(source_reference) => {
331                self.apply_source_references_update(source_reference, diff, retractions);
332            }
333            StateUpdateKind::AuditLog(_audit_log) => {
334                // Audit logs are not stored in-memory.
335            }
336            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
337                self.apply_storage_collection_metadata_update(
338                    storage_collection_metadata,
339                    diff,
340                    retractions,
341                );
342            }
343            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
344                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
345            }
346        }
347
348        Ok(())
349    }
350
351    #[instrument(level = "debug")]
352    fn apply_role_auth_update(
353        &mut self,
354        role_auth: mz_catalog::durable::RoleAuth,
355        diff: StateDiff,
356        retractions: &mut InProgressRetractions,
357    ) {
358        apply_with_update(
359            &mut self.role_auth_by_id,
360            role_auth,
361            |role_auth| role_auth.role_id,
362            diff,
363            &mut retractions.role_auths,
364        );
365    }
366
367    #[instrument(level = "debug")]
368    fn apply_role_update(
369        &mut self,
370        role: mz_catalog::durable::Role,
371        diff: StateDiff,
372        retractions: &mut InProgressRetractions,
373    ) {
374        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
375        apply_with_update(
376            &mut self.roles_by_id,
377            role,
378            |role| role.id,
379            diff,
380            &mut retractions.roles,
381        );
382    }
383
384    #[instrument(level = "debug")]
385    fn apply_database_update(
386        &mut self,
387        database: mz_catalog::durable::Database,
388        diff: StateDiff,
389        retractions: &mut InProgressRetractions,
390    ) {
391        apply_inverted_lookup(
392            &mut self.database_by_name,
393            &database.name,
394            database.id,
395            diff,
396        );
397        apply_with_update(
398            &mut self.database_by_id,
399            database,
400            |database| database.id,
401            diff,
402            &mut retractions.databases,
403        );
404    }
405
406    #[instrument(level = "debug")]
407    fn apply_schema_update(
408        &mut self,
409        schema: mz_catalog::durable::Schema,
410        diff: StateDiff,
411        retractions: &mut InProgressRetractions,
412    ) {
413        match &schema.database_id {
414            Some(database_id) => {
415                let db = self
416                    .database_by_id
417                    .get_mut(database_id)
418                    .expect("catalog out of sync");
419                apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
420                apply_with_update(
421                    &mut db.schemas_by_id,
422                    schema,
423                    |schema| schema.id,
424                    diff,
425                    &mut retractions.schemas,
426                );
427            }
428            None => {
429                apply_inverted_lookup(
430                    &mut self.ambient_schemas_by_name,
431                    &schema.name,
432                    schema.id,
433                    diff,
434                );
435                apply_with_update(
436                    &mut self.ambient_schemas_by_id,
437                    schema,
438                    |schema| schema.id,
439                    diff,
440                    &mut retractions.schemas,
441                );
442            }
443        }
444    }
445
446    #[instrument(level = "debug")]
447    fn apply_default_privilege_update(
448        &mut self,
449        default_privilege: mz_catalog::durable::DefaultPrivilege,
450        diff: StateDiff,
451        _retractions: &mut InProgressRetractions,
452    ) {
453        match diff {
454            StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
455                .grant(default_privilege.object, default_privilege.acl_item),
456            StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
457                .revoke(&default_privilege.object, &default_privilege.acl_item),
458        }
459    }
460
461    #[instrument(level = "debug")]
462    fn apply_system_privilege_update(
463        &mut self,
464        system_privilege: MzAclItem,
465        diff: StateDiff,
466        _retractions: &mut InProgressRetractions,
467    ) {
468        match diff {
469            StateDiff::Addition => {
470                Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
471            }
472            StateDiff::Retraction => {
473                Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
474            }
475        }
476    }
477
478    #[instrument(level = "debug")]
479    fn apply_system_configuration_update(
480        &mut self,
481        system_configuration: mz_catalog::durable::SystemConfiguration,
482        diff: StateDiff,
483        _retractions: &mut InProgressRetractions,
484    ) {
485        let res = match diff {
486            StateDiff::Addition => self.insert_system_configuration(
487                &system_configuration.name,
488                VarInput::Flat(&system_configuration.value),
489            ),
490            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
491        };
492        match res {
493            Ok(_) => (),
494            // When system variables are deleted, nothing deletes them from the underlying
495            // durable catalog, which isn't great. Still, we need to be able to ignore
496            // unknown variables.
497            Err(Error {
498                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
499            }) => {
500                warn!(%name, "unknown system parameter from catalog storage");
501            }
502            Err(e) => panic!("unable to update system variable: {e:?}"),
503        }
504    }
505
506    #[instrument(level = "debug")]
507    fn apply_cluster_update(
508        &mut self,
509        cluster: mz_catalog::durable::Cluster,
510        diff: StateDiff,
511        retractions: &mut InProgressRetractions,
512    ) {
513        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
514        apply_with_update(
515            &mut self.clusters_by_id,
516            cluster,
517            |cluster| cluster.id,
518            diff,
519            &mut retractions.clusters,
520        );
521    }
522
523    #[instrument(level = "debug")]
524    fn apply_network_policy_update(
525        &mut self,
526        policy: mz_catalog::durable::NetworkPolicy,
527        diff: StateDiff,
528        retractions: &mut InProgressRetractions,
529    ) {
530        apply_inverted_lookup(
531            &mut self.network_policies_by_name,
532            &policy.name,
533            policy.id,
534            diff,
535        );
536        apply_with_update(
537            &mut self.network_policies_by_id,
538            policy,
539            |policy| policy.id,
540            diff,
541            &mut retractions.network_policies,
542        );
543    }
544
545    #[instrument(level = "debug")]
546    fn apply_introspection_source_index_update(
547        &mut self,
548        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
549        diff: StateDiff,
550        retractions: &mut InProgressRetractions,
551    ) {
552        let cluster = self
553            .clusters_by_id
554            .get_mut(&introspection_source_index.cluster_id)
555            .expect("catalog out of sync");
556        let log = BUILTIN_LOG_LOOKUP
557            .get(introspection_source_index.name.as_str())
558            .expect("missing log");
559        apply_inverted_lookup(
560            &mut cluster.log_indexes,
561            &log.variant,
562            introspection_source_index.index_id,
563            diff,
564        );
565
566        match diff {
567            StateDiff::Addition => {
568                if let Some(mut entry) = retractions
569                    .introspection_source_indexes
570                    .remove(&introspection_source_index.item_id)
571                {
572                    // This should only happen during startup as a result of builtin migrations. We
573                    // create a new index item and replace the old one with it.
574                    let (index_name, index) = self.create_introspection_source_index(
575                        introspection_source_index.cluster_id,
576                        log,
577                        introspection_source_index.index_id,
578                    );
579                    assert_eq!(entry.id, introspection_source_index.item_id);
580                    assert_eq!(entry.oid, introspection_source_index.oid);
581                    assert_eq!(entry.name, index_name);
582                    entry.item = index;
583                    self.insert_entry(entry);
584                } else {
585                    self.insert_introspection_source_index(
586                        introspection_source_index.cluster_id,
587                        log,
588                        introspection_source_index.item_id,
589                        introspection_source_index.index_id,
590                        introspection_source_index.oid,
591                    );
592                }
593            }
594            StateDiff::Retraction => {
595                let entry = self.drop_item(introspection_source_index.item_id);
596                retractions
597                    .introspection_source_indexes
598                    .insert(entry.id, entry);
599            }
600        }
601    }
602
603    #[instrument(level = "debug")]
604    fn apply_cluster_replica_update(
605        &mut self,
606        cluster_replica: mz_catalog::durable::ClusterReplica,
607        diff: StateDiff,
608        _retractions: &mut InProgressRetractions,
609    ) {
610        let cluster = self
611            .clusters_by_id
612            .get(&cluster_replica.cluster_id)
613            .expect("catalog out of sync");
614        let azs = cluster.availability_zones();
615        let location = self
616            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
617            .expect("catalog in unexpected state");
618        let cluster = self
619            .clusters_by_id
620            .get_mut(&cluster_replica.cluster_id)
621            .expect("catalog out of sync");
622        apply_inverted_lookup(
623            &mut cluster.replica_id_by_name_,
624            &cluster_replica.name,
625            cluster_replica.replica_id,
626            diff,
627        );
628        match diff {
629            StateDiff::Retraction => {
630                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
631                assert!(
632                    prev.is_some(),
633                    "retraction does not match existing value: {:?}",
634                    cluster_replica.replica_id
635                );
636            }
637            StateDiff::Addition => {
638                let logging = ReplicaLogging {
639                    log_logging: cluster_replica.config.logging.log_logging,
640                    interval: cluster_replica.config.logging.interval,
641                };
642                let config = ReplicaConfig {
643                    location,
644                    compute: ComputeReplicaConfig { logging },
645                };
646                let mem_cluster_replica = ClusterReplica {
647                    name: cluster_replica.name.clone(),
648                    cluster_id: cluster_replica.cluster_id,
649                    replica_id: cluster_replica.replica_id,
650                    config,
651                    owner_id: cluster_replica.owner_id,
652                };
653                let prev = cluster
654                    .replicas_by_id_
655                    .insert(cluster_replica.replica_id, mem_cluster_replica);
656                assert_eq!(
657                    prev, None,
658                    "values must be explicitly retracted before inserting a new value: {:?}",
659                    cluster_replica.replica_id
660                );
661            }
662        }
663    }
664
665    #[instrument(level = "debug")]
666    fn apply_system_object_mapping_update(
667        &mut self,
668        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
669        diff: StateDiff,
670        retractions: &mut InProgressRetractions,
671        local_expression_cache: &mut LocalExpressionCache,
672    ) {
673        let item_id = system_object_mapping.unique_identifier.catalog_id;
674        let global_id = system_object_mapping.unique_identifier.global_id;
675
676        if system_object_mapping.unique_identifier.runtime_alterable() {
677            // Runtime-alterable system objects have real entries in the items
678            // collection and so get handled through the normal `insert_item`
679            // and `drop_item` code paths.
680            return;
681        }
682
683        if let StateDiff::Retraction = diff {
684            let entry = self.drop_item(item_id);
685            retractions.system_object_mappings.insert(item_id, entry);
686            return;
687        }
688
689        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
690            // This implies that we updated the fingerprint for some builtin item. The retraction
691            // was parsed, planned, and optimized using the compiled in definition, not the
692            // definition from a previous version. So we can just stick the old entry back into the
693            // catalog.
694            self.insert_entry(entry);
695            return;
696        }
697
698        let builtin = BUILTIN_LOOKUP
699            .get(&system_object_mapping.description)
700            .expect("missing builtin")
701            .1;
702        let schema_name = builtin.schema();
703        let schema_id = self
704            .ambient_schemas_by_name
705            .get(schema_name)
706            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
707        let name = QualifiedItemName {
708            qualifiers: ItemQualifiers {
709                database_spec: ResolvedDatabaseSpecifier::Ambient,
710                schema_spec: SchemaSpecifier::Id(*schema_id),
711            },
712            item: builtin.name().into(),
713        };
714        match builtin {
715            Builtin::Log(log) => {
716                let mut acl_items = vec![rbac::owner_privilege(
717                    mz_sql::catalog::ObjectType::Source,
718                    MZ_SYSTEM_ROLE_ID,
719                )];
720                acl_items.extend_from_slice(&log.access);
721                self.insert_item(
722                    item_id,
723                    log.oid,
724                    name.clone(),
725                    CatalogItem::Log(Log {
726                        variant: log.variant,
727                        global_id,
728                    }),
729                    MZ_SYSTEM_ROLE_ID,
730                    PrivilegeMap::from_mz_acl_items(acl_items),
731                );
732            }
733
734            Builtin::Table(table) => {
735                let mut acl_items = vec![rbac::owner_privilege(
736                    mz_sql::catalog::ObjectType::Table,
737                    MZ_SYSTEM_ROLE_ID,
738                )];
739                acl_items.extend_from_slice(&table.access);
740
741                self.insert_item(
742                    item_id,
743                    table.oid,
744                    name.clone(),
745                    CatalogItem::Table(Table {
746                        create_sql: None,
747                        desc: VersionedRelationDesc::new(table.desc.clone()),
748                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
749                        conn_id: None,
750                        resolved_ids: ResolvedIds::empty(),
751                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
752                            || {
753                                self.system_config()
754                                    .metrics_retention()
755                                    .try_into()
756                                    .expect("invalid metrics retention")
757                            },
758                        ),
759                        is_retained_metrics_object: table.is_retained_metrics_object,
760                        data_source: TableDataSource::TableWrites {
761                            defaults: vec![Expr::null(); table.desc.arity()],
762                        },
763                    }),
764                    MZ_SYSTEM_ROLE_ID,
765                    PrivilegeMap::from_mz_acl_items(acl_items),
766                );
767            }
768            Builtin::Index(index) => {
769                let custom_logical_compaction_window =
770                    index.is_retained_metrics_object.then(|| {
771                        self.system_config()
772                            .metrics_retention()
773                            .try_into()
774                            .expect("invalid metrics retention")
775                    });
776                // Indexes can't be versioned.
777                let versions = BTreeMap::new();
778
779                let item = self
780                    .parse_item(
781                        global_id,
782                        &index.create_sql(),
783                        &versions,
784                        None,
785                        index.is_retained_metrics_object,
786                        custom_logical_compaction_window,
787                        local_expression_cache,
788                        None,
789                    )
790                    .unwrap_or_else(|e| {
791                        panic!(
792                            "internal error: failed to load bootstrap index:\n\
793                                    {}\n\
794                                    error:\n\
795                                    {:?}\n\n\
796                                    make sure that the schema name is specified in the builtin index's create sql statement.",
797                            index.name, e
798                        )
799                    });
800                let CatalogItem::Index(_) = item else {
801                    panic!(
802                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
803                        index.name
804                    );
805                };
806
807                self.insert_item(
808                    item_id,
809                    index.oid,
810                    name,
811                    item,
812                    MZ_SYSTEM_ROLE_ID,
813                    PrivilegeMap::default(),
814                );
815            }
816            Builtin::View(_) => {
817                // parse_views is responsible for inserting all builtin views.
818                unreachable!("views added elsewhere");
819            }
820
821            // Note: Element types must be loaded before array types.
822            Builtin::Type(typ) => {
823                let typ = self.resolve_builtin_type_references(typ);
824                if let CatalogType::Array { element_reference } = typ.details.typ {
825                    let entry = self.get_entry_mut(&element_reference);
826                    let item_type = match &mut entry.item {
827                        CatalogItem::Type(item_type) => item_type,
828                        _ => unreachable!("types can only reference other types"),
829                    };
830                    item_type.details.array_id = Some(item_id);
831                }
832
833                let schema_id = self.resolve_system_schema(typ.schema);
834
835                self.insert_item(
836                    item_id,
837                    typ.oid,
838                    QualifiedItemName {
839                        qualifiers: ItemQualifiers {
840                            database_spec: ResolvedDatabaseSpecifier::Ambient,
841                            schema_spec: SchemaSpecifier::Id(schema_id),
842                        },
843                        item: typ.name.to_owned(),
844                    },
845                    CatalogItem::Type(Type {
846                        create_sql: None,
847                        global_id,
848                        details: typ.details.clone(),
849                        resolved_ids: ResolvedIds::empty(),
850                    }),
851                    MZ_SYSTEM_ROLE_ID,
852                    PrivilegeMap::from_mz_acl_items(vec![
853                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
854                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
855                    ]),
856                );
857            }
858
859            Builtin::Func(func) => {
860                // This OID is never used. `func` has a `Vec` of implementations and
861                // each implementation has its own OID. Those are the OIDs that are
862                // actually used by the system.
863                let oid = INVALID_OID;
864                self.insert_item(
865                    item_id,
866                    oid,
867                    name.clone(),
868                    CatalogItem::Func(Func {
869                        inner: func.inner,
870                        global_id,
871                    }),
872                    MZ_SYSTEM_ROLE_ID,
873                    PrivilegeMap::default(),
874                );
875            }
876
877            Builtin::Source(coll) => {
878                let mut acl_items = vec![rbac::owner_privilege(
879                    mz_sql::catalog::ObjectType::Source,
880                    MZ_SYSTEM_ROLE_ID,
881                )];
882                acl_items.extend_from_slice(&coll.access);
883
884                self.insert_item(
885                    item_id,
886                    coll.oid,
887                    name.clone(),
888                    CatalogItem::Source(Source {
889                        create_sql: None,
890                        data_source: coll.data_source.clone(),
891                        desc: coll.desc.clone(),
892                        global_id,
893                        timeline: Timeline::EpochMilliseconds,
894                        resolved_ids: ResolvedIds::empty(),
895                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
896                            || {
897                                self.system_config()
898                                    .metrics_retention()
899                                    .try_into()
900                                    .expect("invalid metrics retention")
901                            },
902                        ),
903                        is_retained_metrics_object: coll.is_retained_metrics_object,
904                    }),
905                    MZ_SYSTEM_ROLE_ID,
906                    PrivilegeMap::from_mz_acl_items(acl_items),
907                );
908            }
909            Builtin::MaterializedView(mv) => {
910                let mut acl_items = vec![rbac::owner_privilege(
911                    mz_sql::catalog::ObjectType::MaterializedView,
912                    MZ_SYSTEM_ROLE_ID,
913                )];
914                acl_items.extend_from_slice(&mv.access);
915                // Builtin materialized views can't be versioned.
916                let versions = BTreeMap::new();
917
918                let mut item = self
919                    .parse_item(
920                        global_id,
921                        &mv.create_sql(),
922                        &versions,
923                        None,
924                        false,
925                        None,
926                        local_expression_cache,
927                        None,
928                    )
929                    .unwrap_or_else(|e| {
930                        panic!(
931                            "internal error: failed to load bootstrap materialized view:\n\
932                             {}\n\
933                             error:\n\
934                             {e:?}\n\n\
935                             make sure that the schema name is specified in the builtin \
936                             materialized view's create sql statement.",
937                            mv.name,
938                        )
939                    });
940                let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
941                    panic!(
942                        "internal error: builtin materialized view {}'s SQL does not begin \
943                         with \"CREATE MATERIALIZED VIEW\".",
944                        mv.name,
945                    );
946                };
947
948                // The optimizer can only infer keys from MV definitions, but cannot infer
949                // uniqueness present in the input data. Extend with the keys declared in the
950                // builtin definition, to allow supplying additional key knowledge.
951                let mut desc = catalog_mv.desc.latest();
952                for key in &mv.desc.typ().keys {
953                    desc = desc.with_key(key.clone());
954                }
955                catalog_mv.desc = VersionedRelationDesc::new(desc);
956
957                self.insert_item(
958                    item_id,
959                    mv.oid,
960                    name,
961                    item,
962                    MZ_SYSTEM_ROLE_ID,
963                    PrivilegeMap::from_mz_acl_items(acl_items),
964                );
965            }
966            Builtin::ContinualTask(ct) => {
967                let mut acl_items = vec![rbac::owner_privilege(
968                    mz_sql::catalog::ObjectType::Source,
969                    MZ_SYSTEM_ROLE_ID,
970                )];
971                acl_items.extend_from_slice(&ct.access);
972                // Continual Tasks can't be versioned.
973                let versions = BTreeMap::new();
974
975                let item = self
976                    .parse_item(
977                        global_id,
978                        &ct.create_sql(),
979                        &versions,
980                        None,
981                        false,
982                        None,
983                        local_expression_cache,
984                        None,
985                    )
986                    .unwrap_or_else(|e| {
987                        panic!(
988                            "internal error: failed to load bootstrap continual task:\n\
989                                    {}\n\
990                                    error:\n\
991                                    {:?}\n\n\
992                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
993                            ct.name, e
994                        )
995                    });
996                let CatalogItem::ContinualTask(_) = &item else {
997                    panic!(
998                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
999                        ct.name
1000                    );
1001                };
1002
1003                self.insert_item(
1004                    item_id,
1005                    ct.oid,
1006                    name,
1007                    item,
1008                    MZ_SYSTEM_ROLE_ID,
1009                    PrivilegeMap::from_mz_acl_items(acl_items),
1010                );
1011            }
1012            Builtin::Connection(connection) => {
1013                // Connections can't be versioned.
1014                let versions = BTreeMap::new();
1015                let mut item = self
1016                    .parse_item(
1017                        global_id,
1018                        connection.sql,
1019                        &versions,
1020                        None,
1021                        false,
1022                        None,
1023                        local_expression_cache,
1024                        None,
1025                    )
1026                    .unwrap_or_else(|e| {
1027                        panic!(
1028                            "internal error: failed to load bootstrap connection:\n\
1029                                    {}\n\
1030                                    error:\n\
1031                                    {:?}\n\n\
1032                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
1033                            connection.name, e
1034                        )
1035                    });
1036                let CatalogItem::Connection(_) = &mut item else {
1037                    panic!(
1038                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1039                        connection.name
1040                    );
1041                };
1042
1043                let mut acl_items = vec![rbac::owner_privilege(
1044                    mz_sql::catalog::ObjectType::Connection,
1045                    connection.owner_id.clone(),
1046                )];
1047                acl_items.extend_from_slice(connection.access);
1048
1049                self.insert_item(
1050                    item_id,
1051                    connection.oid,
1052                    name.clone(),
1053                    item,
1054                    connection.owner_id.clone(),
1055                    PrivilegeMap::from_mz_acl_items(acl_items),
1056                );
1057            }
1058        }
1059    }
1060
1061    #[instrument(level = "debug")]
1062    fn apply_temporary_item_update(
1063        &mut self,
1064        temporary_item: TemporaryItem,
1065        diff: StateDiff,
1066        retractions: &mut InProgressRetractions,
1067        local_expression_cache: &mut LocalExpressionCache,
1068    ) {
1069        match diff {
1070            StateDiff::Addition => {
1071                let TemporaryItem {
1072                    id,
1073                    oid,
1074                    global_id,
1075                    schema_id,
1076                    name,
1077                    conn_id,
1078                    create_sql,
1079                    owner_id,
1080                    privileges,
1081                    extra_versions,
1082                } = temporary_item;
1083                // Lazily create the temporary schema if it doesn't exist yet.
1084                // We need the conn_id to create the schema, and it should always be Some for temp items.
1085                let temp_conn_id = conn_id
1086                    .as_ref()
1087                    .expect("temporary items must have a connection id");
1088                if !self.temporary_schemas.contains_key(temp_conn_id) {
1089                    self.create_temporary_schema(temp_conn_id, owner_id)
1090                        .expect("failed to create temporary schema");
1091                }
1092                let schema = self.find_temp_schema(&schema_id);
1093                let name = QualifiedItemName {
1094                    qualifiers: ItemQualifiers {
1095                        database_spec: schema.database().clone(),
1096                        schema_spec: schema.id().clone(),
1097                    },
1098                    item: name.clone(),
1099                };
1100
1101                let entry = match retractions.temp_items.remove(&id) {
1102                    Some(mut retraction) => {
1103                        assert_eq!(retraction.id, id);
1104
1105                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1106                        // item. This is a performance optimization and not needed for correctness.
1107                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1108                        // is still the same as the trait.
1109                        if retraction.create_sql() != create_sql {
1110                            let mut catalog_item = self
1111                                .deserialize_item(
1112                                    global_id,
1113                                    &create_sql,
1114                                    &extra_versions,
1115                                    local_expression_cache,
1116                                    Some(retraction.item),
1117                                )
1118                                .unwrap_or_else(|e| {
1119                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1120                                });
1121                            // Have to patch up the item because parsing doesn't
1122                            // take into account temporary schemas/conn_id.
1123                            // NOTE(aljoscha): I don't like how we're patching
1124                            // this in here, but it's but one of the ways in
1125                            // which temporary items are a bit weird. So, here
1126                            // we are ...
1127                            catalog_item.set_conn_id(conn_id);
1128                            retraction.item = catalog_item;
1129                        }
1130
1131                        retraction.id = id;
1132                        retraction.oid = oid;
1133                        retraction.name = name;
1134                        retraction.owner_id = owner_id;
1135                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1136                        retraction
1137                    }
1138                    None => {
1139                        let mut catalog_item = self
1140                            .deserialize_item(
1141                                global_id,
1142                                &create_sql,
1143                                &extra_versions,
1144                                local_expression_cache,
1145                                None,
1146                            )
1147                            .unwrap_or_else(|e| {
1148                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1149                            });
1150
1151                        // Have to patch up the item because parsing doesn't
1152                        // take into account temporary schemas/conn_id.
1153                        // NOTE(aljoscha): I don't like how we're patching this
1154                        // in here, but it's but one of the ways in which
1155                        // temporary items are a bit weird. So, here we are ...
1156                        catalog_item.set_conn_id(conn_id);
1157
1158                        CatalogEntry {
1159                            item: catalog_item,
1160                            referenced_by: Vec::new(),
1161                            used_by: Vec::new(),
1162                            id,
1163                            oid,
1164                            name,
1165                            owner_id,
1166                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1167                        }
1168                    }
1169                };
1170                self.insert_entry(entry);
1171            }
1172            StateDiff::Retraction => {
1173                let entry = self.drop_item(temporary_item.id);
1174                retractions.temp_items.insert(temporary_item.id, entry);
1175            }
1176        }
1177    }
1178
1179    #[instrument(level = "debug")]
1180    fn apply_item_update(
1181        &mut self,
1182        item: mz_catalog::durable::Item,
1183        diff: StateDiff,
1184        retractions: &mut InProgressRetractions,
1185        local_expression_cache: &mut LocalExpressionCache,
1186    ) -> Result<(), CatalogError> {
1187        match diff {
1188            StateDiff::Addition => {
1189                let key = item.key();
1190                let mz_catalog::durable::Item {
1191                    id,
1192                    oid,
1193                    global_id,
1194                    schema_id,
1195                    name,
1196                    create_sql,
1197                    owner_id,
1198                    privileges,
1199                    extra_versions,
1200                } = item;
1201                let schema = self.find_non_temp_schema(&schema_id);
1202                let name = QualifiedItemName {
1203                    qualifiers: ItemQualifiers {
1204                        database_spec: schema.database().clone(),
1205                        schema_spec: schema.id().clone(),
1206                    },
1207                    item: name.clone(),
1208                };
1209                let entry = match retractions.items.remove(&key) {
1210                    Some(retraction) => {
1211                        assert_eq!(retraction.id, item.id);
1212
1213                        let item = self
1214                            .deserialize_item(
1215                                global_id,
1216                                &create_sql,
1217                                &extra_versions,
1218                                local_expression_cache,
1219                                Some(retraction.item),
1220                            )
1221                            .unwrap_or_else(|e| {
1222                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1223                            });
1224
1225                        CatalogEntry {
1226                            item,
1227                            id,
1228                            oid,
1229                            name,
1230                            owner_id,
1231                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1232                            referenced_by: retraction.referenced_by,
1233                            used_by: retraction.used_by,
1234                        }
1235                    }
1236                    None => {
1237                        let catalog_item = self
1238                            .deserialize_item(
1239                                global_id,
1240                                &create_sql,
1241                                &extra_versions,
1242                                local_expression_cache,
1243                                None,
1244                            )
1245                            .unwrap_or_else(|e| {
1246                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1247                            });
1248                        CatalogEntry {
1249                            item: catalog_item,
1250                            referenced_by: Vec::new(),
1251                            used_by: Vec::new(),
1252                            id,
1253                            oid,
1254                            name,
1255                            owner_id,
1256                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1257                        }
1258                    }
1259                };
1260
1261                self.insert_entry(entry);
1262            }
1263            StateDiff::Retraction => {
1264                let entry = self.drop_item(item.id);
1265                let key = item.into_key_value().0;
1266                retractions.items.insert(key, entry);
1267            }
1268        }
1269        Ok(())
1270    }
1271
1272    #[instrument(level = "debug")]
1273    fn apply_comment_update(
1274        &mut self,
1275        comment: mz_catalog::durable::Comment,
1276        diff: StateDiff,
1277        _retractions: &mut InProgressRetractions,
1278    ) {
1279        match diff {
1280            StateDiff::Addition => {
1281                let prev = Arc::make_mut(&mut self.comments).update_comment(
1282                    comment.object_id,
1283                    comment.sub_component,
1284                    Some(comment.comment),
1285                );
1286                assert_eq!(
1287                    prev, None,
1288                    "values must be explicitly retracted before inserting a new value"
1289                );
1290            }
1291            StateDiff::Retraction => {
1292                let prev = Arc::make_mut(&mut self.comments).update_comment(
1293                    comment.object_id,
1294                    comment.sub_component,
1295                    None,
1296                );
1297                assert_eq!(
1298                    prev,
1299                    Some(comment.comment),
1300                    "retraction does not match existing value: ({:?}, {:?})",
1301                    comment.object_id,
1302                    comment.sub_component,
1303                );
1304            }
1305        }
1306    }
1307
1308    #[instrument(level = "debug")]
1309    fn apply_source_references_update(
1310        &mut self,
1311        source_references: mz_catalog::durable::SourceReferences,
1312        diff: StateDiff,
1313        _retractions: &mut InProgressRetractions,
1314    ) {
1315        match diff {
1316            StateDiff::Addition => {
1317                let prev = self
1318                    .source_references
1319                    .insert(source_references.source_id, source_references.into());
1320                assert!(
1321                    prev.is_none(),
1322                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1323                );
1324            }
1325            StateDiff::Retraction => {
1326                let prev = self.source_references.remove(&source_references.source_id);
1327                assert!(
1328                    prev.is_some(),
1329                    "retraction for a non-existent existing value: {source_references:?}"
1330                );
1331            }
1332        }
1333    }
1334
1335    #[instrument(level = "debug")]
1336    fn apply_storage_collection_metadata_update(
1337        &mut self,
1338        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1339        diff: StateDiff,
1340        _retractions: &mut InProgressRetractions,
1341    ) {
1342        apply_inverted_lookup(
1343            &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1344            &storage_collection_metadata.id,
1345            storage_collection_metadata.shard,
1346            diff,
1347        );
1348    }
1349
1350    #[instrument(level = "debug")]
1351    fn apply_unfinalized_shard_update(
1352        &mut self,
1353        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1354        diff: StateDiff,
1355        _retractions: &mut InProgressRetractions,
1356    ) {
1357        match diff {
1358            StateDiff::Addition => {
1359                let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1360                    .unfinalized_shards
1361                    .insert(unfinalized_shard.shard);
1362                assert!(
1363                    newly_inserted,
1364                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1365                );
1366            }
1367            StateDiff::Retraction => {
1368                let removed = Arc::make_mut(&mut self.storage_metadata)
1369                    .unfinalized_shards
1370                    .remove(&unfinalized_shard.shard);
1371                assert!(
1372                    removed,
1373                    "retraction does not match existing value: {unfinalized_shard:?}"
1374                );
1375            }
1376        }
1377    }
1378
1379    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1380    /// durable catalog.
1381    #[instrument]
1382    pub(crate) fn generate_builtin_table_updates(
1383        &self,
1384        updates: Vec<StateUpdate>,
1385    ) -> Vec<BuiltinTableUpdate> {
1386        let mut builtin_table_updates = Vec::new();
1387        for StateUpdate { kind, ts: _, diff } in updates {
1388            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1389            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1390            builtin_table_updates.extend(builtin_table_update);
1391        }
1392        builtin_table_updates
1393    }
1394
1395    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1396    /// durable catalog.
1397    #[instrument(level = "debug")]
1398    pub(crate) fn generate_builtin_table_update(
1399        &self,
1400        kind: StateUpdateKind,
1401        diff: StateDiff,
1402    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1403        let diff = diff.into();
1404        match kind {
1405            StateUpdateKind::Role(role) => self.pack_role_update(role.id, diff),
1406            StateUpdateKind::RoleAuth(role_auth) => {
1407                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1408            }
1409            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1410                vec![self.pack_default_privileges_update(
1411                    &default_privilege.object,
1412                    &default_privilege.acl_item.grantee,
1413                    &default_privilege.acl_item.acl_mode,
1414                    diff,
1415                )]
1416            }
1417            StateUpdateKind::SystemPrivilege(system_privilege) => {
1418                vec![self.pack_system_privileges_update(system_privilege, diff)]
1419            }
1420            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1421            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1422            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1423                self.pack_item_update(introspection_source_index.item_id, diff)
1424            }
1425            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1426                cluster_replica.cluster_id,
1427                &cluster_replica.name,
1428                diff,
1429            ),
1430            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1431                // Runtime-alterable system objects have real entries in the
1432                // items collection and so get handled through the normal
1433                // `StateUpdateKind::Item`.`
1434                if !system_object_mapping.unique_identifier.runtime_alterable() {
1435                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1436                } else {
1437                    vec![]
1438                }
1439            }
1440            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1441            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1442            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1443                comment.object_id,
1444                comment.sub_component,
1445                &comment.comment,
1446                diff,
1447            )],
1448            StateUpdateKind::SourceReferences(source_references) => {
1449                self.pack_source_references_update(&source_references, diff)
1450            }
1451            StateUpdateKind::AuditLog(audit_log) => {
1452                vec![
1453                    self.pack_audit_log_update(&audit_log.event, diff)
1454                        .expect("could not pack audit log update"),
1455                ]
1456            }
1457            StateUpdateKind::Database(_)
1458            | StateUpdateKind::Schema(_)
1459            | StateUpdateKind::NetworkPolicy(_)
1460            | StateUpdateKind::StorageCollectionMetadata(_)
1461            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1462        }
1463    }
1464
1465    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1466        self.entry_by_id
1467            .get_mut(id)
1468            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1469    }
1470
1471    fn get_schema_mut(
1472        &mut self,
1473        database_spec: &ResolvedDatabaseSpecifier,
1474        schema_spec: &SchemaSpecifier,
1475        conn_id: &ConnectionId,
1476    ) -> &mut Schema {
1477        // Keep in sync with `get_schemas`
1478        match (database_spec, schema_spec) {
1479            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1480                .temporary_schemas
1481                .get_mut(conn_id)
1482                .expect("catalog out of sync"),
1483            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1484                .ambient_schemas_by_id
1485                .get_mut(id)
1486                .expect("catalog out of sync"),
1487            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1488                .database_by_id
1489                .get_mut(database_id)
1490                .expect("catalog out of sync")
1491                .schemas_by_id
1492                .get_mut(schema_id)
1493                .expect("catalog out of sync"),
1494            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1495                unreachable!("temporary schemas are in the ambient database")
1496            }
1497        }
1498    }
1499
1500    /// Install builtin views to the catalog. This is its own function so that views can be
1501    /// optimized in parallel.
1502    ///
1503    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1504    /// problems by sniffing out specific errors and then retrying once those dependencies are
1505    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1506    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1507    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1508    /// in awaiting with nothing retriggering the attempt.
1509    #[instrument(name = "catalog::parse_views")]
1510    async fn parse_builtin_views(
1511        state: &mut CatalogState,
1512        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1513        retractions: &mut InProgressRetractions,
1514        local_expression_cache: &mut LocalExpressionCache,
1515    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1516        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1517        let (updates, additions): (Vec<_>, Vec<_>) =
1518            builtin_views
1519                .into_iter()
1520                .partition_map(|(view, item_id, gid)| {
1521                    match retractions.system_object_mappings.remove(&item_id) {
1522                        Some(entry) => Either::Left(entry),
1523                        None => Either::Right((view, item_id, gid)),
1524                    }
1525                });
1526
1527        for entry in updates {
1528            // This implies that we updated the fingerprint for some builtin view. The retraction
1529            // was parsed, planned, and optimized using the compiled in definition, not the
1530            // definition from a previous version. So we can just stick the old entry back into the
1531            // catalog.
1532            let item_id = entry.id();
1533            state.insert_entry(entry);
1534            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1535        }
1536
1537        let mut handles = Vec::new();
1538        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1539            BTreeMap::new();
1540        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1541        // Some errors are due to the implementation of casts or SQL functions that depend on some
1542        // view. Instead of figuring out the exact view dependency, delay these until the end.
1543        let mut awaiting_all = Vec::new();
1544        // Completed views, needed to avoid race conditions.
1545        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1546        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1547
1548        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1549        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1550            .into_iter()
1551            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1552            .collect();
1553        let item_ids: Vec<_> = views.keys().copied().collect();
1554
1555        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1556        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1557            if handles.is_empty() && ready.is_empty() {
1558                // Enqueue the views that were waiting for all the others.
1559                ready.extend(awaiting_all.drain(..));
1560            }
1561
1562            // Spawn tasks for all ready views.
1563            if !ready.is_empty() {
1564                let spawn_state = Arc::new(state.clone());
1565                while let Some(id) = ready.pop_front() {
1566                    let (view, global_id) = views.get(&id).expect("must exist");
1567                    let global_id = *global_id;
1568                    let create_sql = view.create_sql();
1569                    // Views can't be versioned.
1570                    let versions = BTreeMap::new();
1571
1572                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1573                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1574                    let task_state = Arc::clone(&spawn_state);
1575                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1576                    let handle = mz_ore::task::spawn_blocking(
1577                        || "parse view",
1578                        move || {
1579                            span.in_scope(|| {
1580                                let res = task_state.parse_item_inner(
1581                                    global_id,
1582                                    &create_sql,
1583                                    &versions,
1584                                    None,
1585                                    false,
1586                                    None,
1587                                    cached_expr,
1588                                    None,
1589                                );
1590                                (id, global_id, res)
1591                            })
1592                        },
1593                    );
1594                    handles.push(handle);
1595                }
1596            }
1597
1598            // Wait for a view to be ready.
1599            let (selected, _idx, remaining) = future::select_all(handles).await;
1600            handles = remaining;
1601            let (id, global_id, res) = selected;
1602            let mut insert_cached_expr = |cached_expr| {
1603                if let Some(cached_expr) = cached_expr {
1604                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1605                }
1606            };
1607            match res {
1608                Ok((item, uncached_expr)) => {
1609                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1610                        local_expression_cache.insert_uncached_expression(
1611                            global_id,
1612                            uncached_expr,
1613                            optimizer_features,
1614                        );
1615                    }
1616                    // Add item to catalog.
1617                    let (view, _gid) = views.remove(&id).expect("must exist");
1618                    let schema_id = state
1619                        .ambient_schemas_by_name
1620                        .get(view.schema)
1621                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1622                    let qname = QualifiedItemName {
1623                        qualifiers: ItemQualifiers {
1624                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1625                            schema_spec: SchemaSpecifier::Id(*schema_id),
1626                        },
1627                        item: view.name.into(),
1628                    };
1629                    let mut acl_items = vec![rbac::owner_privilege(
1630                        mz_sql::catalog::ObjectType::View,
1631                        MZ_SYSTEM_ROLE_ID,
1632                    )];
1633                    acl_items.extend_from_slice(&view.access);
1634
1635                    state.insert_item(
1636                        id,
1637                        view.oid,
1638                        qname,
1639                        item,
1640                        MZ_SYSTEM_ROLE_ID,
1641                        PrivilegeMap::from_mz_acl_items(acl_items),
1642                    );
1643
1644                    // Enqueue any items waiting on this dependency.
1645                    let mut resolved_dependent_items = Vec::new();
1646                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1647                        resolved_dependent_items.extend(dependent_items);
1648                    }
1649                    let entry = state.get_entry(&id);
1650                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1651                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1652                        resolved_dependent_items.extend(dependent_items);
1653                    }
1654                    ready.extend(resolved_dependent_items);
1655
1656                    completed_ids.insert(id);
1657                    completed_names.insert(full_name);
1658                }
1659                // If we were missing a dependency, wait for it to be added.
1660                Err((
1661                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1662                    cached_expr,
1663                )) => {
1664                    insert_cached_expr(cached_expr);
1665                    if completed_ids.contains(&missing_dep) {
1666                        ready.push_back(id);
1667                    } else {
1668                        awaiting_id_dependencies
1669                            .entry(missing_dep)
1670                            .or_default()
1671                            .push(id);
1672                    }
1673                }
1674                // If we were missing a dependency, wait for it to be added.
1675                Err((
1676                    AdapterError::PlanError(plan::PlanError::Catalog(
1677                        SqlCatalogError::UnknownItem(missing_dep),
1678                    )),
1679                    cached_expr,
1680                )) => {
1681                    insert_cached_expr(cached_expr);
1682                    match CatalogItemId::from_str(&missing_dep) {
1683                        Ok(missing_dep) => {
1684                            if completed_ids.contains(&missing_dep) {
1685                                ready.push_back(id);
1686                            } else {
1687                                awaiting_id_dependencies
1688                                    .entry(missing_dep)
1689                                    .or_default()
1690                                    .push(id);
1691                            }
1692                        }
1693                        Err(_) => {
1694                            if completed_names.contains(&missing_dep) {
1695                                ready.push_back(id);
1696                            } else {
1697                                awaiting_name_dependencies
1698                                    .entry(missing_dep)
1699                                    .or_default()
1700                                    .push(id);
1701                            }
1702                        }
1703                    }
1704                }
1705                Err((
1706                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1707                    cached_expr,
1708                )) => {
1709                    insert_cached_expr(cached_expr);
1710                    awaiting_all.push(id);
1711                }
1712                Err((e, _)) => {
1713                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1714                    panic!(
1715                        "internal error: failed to load bootstrap view:\n\
1716                            {name}\n\
1717                            error:\n\
1718                            {e:?}\n\n\
1719                            Make sure that the schema name is specified in the builtin view's create sql statement.
1720                            ",
1721                        name = bad_view.name,
1722                    )
1723                }
1724            }
1725        }
1726
1727        assert!(awaiting_id_dependencies.is_empty());
1728        assert!(
1729            awaiting_name_dependencies.is_empty(),
1730            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1731        );
1732        assert!(awaiting_all.is_empty());
1733        assert!(views.is_empty());
1734
1735        // Generate a builtin table update for all the new views.
1736        builtin_table_updates.extend(
1737            item_ids
1738                .into_iter()
1739                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1740        );
1741
1742        builtin_table_updates
1743    }
1744
1745    /// Associates a name, `CatalogItemId`, and entry.
1746    fn insert_entry(&mut self, entry: CatalogEntry) {
1747        if !entry.id.is_system() {
1748            if let Some(cluster_id) = entry.item.cluster_id() {
1749                self.clusters_by_id
1750                    .get_mut(&cluster_id)
1751                    .expect("catalog out of sync")
1752                    .bound_objects
1753                    .insert(entry.id);
1754            };
1755        }
1756
1757        for u in entry.references().items() {
1758            match self.entry_by_id.get_mut(u) {
1759                Some(metadata) => metadata.referenced_by.push(entry.id()),
1760                None => panic!(
1761                    "Catalog: missing dependent catalog item {} while installing {}",
1762                    &u,
1763                    self.resolve_full_name(entry.name(), entry.conn_id())
1764                ),
1765            }
1766        }
1767        for u in entry.uses() {
1768            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1769            // present.
1770            if u == entry.id() {
1771                continue;
1772            }
1773            match self.entry_by_id.get_mut(&u) {
1774                Some(metadata) => metadata.used_by.push(entry.id()),
1775                None => panic!(
1776                    "Catalog: missing dependent catalog item {} while installing {}",
1777                    &u,
1778                    self.resolve_full_name(entry.name(), entry.conn_id())
1779                ),
1780            }
1781        }
1782        for gid in entry.item.global_ids() {
1783            self.entry_by_global_id.insert(gid, entry.id());
1784        }
1785        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1786        // Lazily create the temporary schema if this is a temporary item and the schema
1787        // doesn't exist yet.
1788        if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1789            && !self.temporary_schemas.contains_key(conn_id)
1790        {
1791            self.create_temporary_schema(conn_id, entry.owner_id)
1792                .expect("failed to create temporary schema");
1793        }
1794        let schema = self.get_schema_mut(
1795            &entry.name().qualifiers.database_spec,
1796            &entry.name().qualifiers.schema_spec,
1797            conn_id,
1798        );
1799
1800        let prev_id = match entry.item() {
1801            CatalogItem::Func(_) => schema
1802                .functions
1803                .insert(entry.name().item.clone(), entry.id()),
1804            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1805            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1806        };
1807
1808        assert!(
1809            prev_id.is_none(),
1810            "builtin name collision on {:?}",
1811            entry.name().item.clone()
1812        );
1813
1814        self.entry_by_id.insert(entry.id(), entry.clone());
1815    }
1816
1817    /// Associates a name, [`CatalogItemId`], and entry.
1818    fn insert_item(
1819        &mut self,
1820        id: CatalogItemId,
1821        oid: u32,
1822        name: QualifiedItemName,
1823        item: CatalogItem,
1824        owner_id: RoleId,
1825        privileges: PrivilegeMap,
1826    ) {
1827        let entry = CatalogEntry {
1828            item,
1829            name,
1830            id,
1831            oid,
1832            used_by: Vec::new(),
1833            referenced_by: Vec::new(),
1834            owner_id,
1835            privileges,
1836        };
1837
1838        self.insert_entry(entry);
1839    }
1840
1841    #[mz_ore::instrument(level = "trace")]
1842    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1843        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1844        for u in metadata.references().items() {
1845            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1846                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1847            }
1848        }
1849        for u in metadata.uses() {
1850            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1851                dep_metadata.used_by.retain(|u| *u != metadata.id())
1852            }
1853        }
1854        for gid in metadata.global_ids() {
1855            self.entry_by_global_id.remove(&gid);
1856        }
1857
1858        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1859        let schema = self.get_schema_mut(
1860            &metadata.name().qualifiers.database_spec,
1861            &metadata.name().qualifiers.schema_spec,
1862            conn_id,
1863        );
1864        if metadata.item_type() == CatalogItemType::Type {
1865            schema
1866                .types
1867                .remove(&metadata.name().item)
1868                .expect("catalog out of sync");
1869        } else {
1870            // Functions would need special handling, but we don't yet support
1871            // dropping functions.
1872            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1873
1874            schema
1875                .items
1876                .remove(&metadata.name().item)
1877                .expect("catalog out of sync");
1878        };
1879
1880        if !id.is_system() {
1881            if let Some(cluster_id) = metadata.item().cluster_id() {
1882                assert!(
1883                    self.clusters_by_id
1884                        .get_mut(&cluster_id)
1885                        .expect("catalog out of sync")
1886                        .bound_objects
1887                        .remove(&id),
1888                    "catalog out of sync"
1889                );
1890            }
1891        }
1892
1893        metadata
1894    }
1895
1896    fn insert_introspection_source_index(
1897        &mut self,
1898        cluster_id: ClusterId,
1899        log: &'static BuiltinLog,
1900        item_id: CatalogItemId,
1901        global_id: GlobalId,
1902        oid: u32,
1903    ) {
1904        let (index_name, index) =
1905            self.create_introspection_source_index(cluster_id, log, global_id);
1906        self.insert_item(
1907            item_id,
1908            oid,
1909            index_name,
1910            index,
1911            MZ_SYSTEM_ROLE_ID,
1912            PrivilegeMap::default(),
1913        );
1914    }
1915
1916    fn create_introspection_source_index(
1917        &self,
1918        cluster_id: ClusterId,
1919        log: &'static BuiltinLog,
1920        global_id: GlobalId,
1921    ) -> (QualifiedItemName, CatalogItem) {
1922        let source_name = FullItemName {
1923            database: RawDatabaseSpecifier::Ambient,
1924            schema: log.schema.into(),
1925            item: log.name.into(),
1926        };
1927        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1928        let mut index_name = QualifiedItemName {
1929            qualifiers: ItemQualifiers {
1930                database_spec: ResolvedDatabaseSpecifier::Ambient,
1931                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1932            },
1933            item: index_name.clone(),
1934        };
1935        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1936        let index_item_name = index_name.item.clone();
1937        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1938        let index = CatalogItem::Index(Index {
1939            global_id,
1940            on: log_global_id,
1941            keys: log
1942                .variant
1943                .index_by()
1944                .into_iter()
1945                .map(MirScalarExpr::column)
1946                .collect(),
1947            create_sql: index_sql(
1948                index_item_name,
1949                cluster_id,
1950                source_name,
1951                &log.variant.desc(),
1952                &log.variant.index_by(),
1953            ),
1954            conn_id: None,
1955            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1956            cluster_id,
1957            is_retained_metrics_object: false,
1958            custom_logical_compaction_window: None,
1959        });
1960        (index_name, index)
1961    }
1962
1963    /// Insert system configuration `name` with `value`.
1964    ///
1965    /// Return a `bool` value indicating whether the configuration was modified
1966    /// by the call.
1967    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1968        Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
1969    }
1970
1971    /// Reset system configuration `name`.
1972    ///
1973    /// Return a `bool` value indicating whether the configuration was modified
1974    /// by the call.
1975    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1976        Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
1977    }
1978}
1979
1980/// Sort [`StateUpdate`]s in dependency order.
1981///
1982/// # Panics
1983///
1984/// This function assumes that all provided `updates` have the same timestamp and will panic
1985/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
1986/// `StateUpdateKinds` are unique.
1987fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1988    fn push_update<T>(
1989        update: T,
1990        diff: StateDiff,
1991        retractions: &mut Vec<T>,
1992        additions: &mut Vec<T>,
1993    ) {
1994        match diff {
1995            StateDiff::Retraction => retractions.push(update),
1996            StateDiff::Addition => additions.push(update),
1997        }
1998    }
1999
2000    soft_assert_no_log!(
2001        updates.iter().map(|update| update.ts).all_equal(),
2002        "all timestamps should be equal: {updates:?}"
2003    );
2004    soft_assert_no_log!(
2005        {
2006            let mut dedup = BTreeSet::new();
2007            updates.iter().all(|update| dedup.insert(&update.kind))
2008        },
2009        "updates should be consolidated: {updates:?}"
2010    );
2011
2012    // Partition updates by type so that we can weave different update types into the right spots.
2013    let mut pre_cluster_retractions = Vec::new();
2014    let mut pre_cluster_additions = Vec::new();
2015    let mut cluster_retractions = Vec::new();
2016    let mut cluster_additions = Vec::new();
2017    let mut builtin_item_updates = Vec::new();
2018    let mut item_retractions = Vec::new();
2019    let mut item_additions = Vec::new();
2020    let mut temp_item_retractions = Vec::new();
2021    let mut temp_item_additions = Vec::new();
2022    let mut post_item_retractions = Vec::new();
2023    let mut post_item_additions = Vec::new();
2024    for update in updates {
2025        let diff = update.diff.clone();
2026        match update.kind {
2027            StateUpdateKind::Role(_)
2028            | StateUpdateKind::RoleAuth(_)
2029            | StateUpdateKind::Database(_)
2030            | StateUpdateKind::Schema(_)
2031            | StateUpdateKind::DefaultPrivilege(_)
2032            | StateUpdateKind::SystemPrivilege(_)
2033            | StateUpdateKind::SystemConfiguration(_)
2034            | StateUpdateKind::NetworkPolicy(_) => push_update(
2035                update,
2036                diff,
2037                &mut pre_cluster_retractions,
2038                &mut pre_cluster_additions,
2039            ),
2040            StateUpdateKind::Cluster(_)
2041            | StateUpdateKind::IntrospectionSourceIndex(_)
2042            | StateUpdateKind::ClusterReplica(_) => push_update(
2043                update,
2044                diff,
2045                &mut cluster_retractions,
2046                &mut cluster_additions,
2047            ),
2048            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2049                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2050            }
2051            StateUpdateKind::TemporaryItem(item) => push_update(
2052                (item, update.ts, update.diff),
2053                diff,
2054                &mut temp_item_retractions,
2055                &mut temp_item_additions,
2056            ),
2057            StateUpdateKind::Item(item) => push_update(
2058                (item, update.ts, update.diff),
2059                diff,
2060                &mut item_retractions,
2061                &mut item_additions,
2062            ),
2063            StateUpdateKind::Comment(_)
2064            | StateUpdateKind::SourceReferences(_)
2065            | StateUpdateKind::AuditLog(_)
2066            | StateUpdateKind::StorageCollectionMetadata(_)
2067            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2068                update,
2069                diff,
2070                &mut post_item_retractions,
2071                &mut post_item_additions,
2072            ),
2073        }
2074    }
2075
2076    // Sort builtin item updates by dependency. The builtin definition order must be a dependency
2077    // order, so we can use that to avoid a more expensive topo sorting.
2078    let builtin_item_updates = builtin_item_updates
2079        .into_iter()
2080        .map(|(system_object_mapping, ts, diff)| {
2081            let idx = BUILTIN_LOOKUP
2082                .get(&system_object_mapping.description)
2083                .expect("missing builtin")
2084                .0;
2085            (idx, system_object_mapping, ts, diff)
2086        })
2087        .sorted_by_key(|(idx, _, _, _)| *idx)
2088        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2089
2090    // Partition builtins based on whether or not they should be applied before or after clusters.
2091    // Clusters depend on sources (introspection logs), so sources need to be applied first.
2092    // Everything else can be applied after clusters.
2093    let mut builtin_source_retractions = Vec::new();
2094    let mut builtin_source_additions = Vec::new();
2095    let mut other_builtin_retractions = Vec::new();
2096    let mut other_builtin_additions = Vec::new();
2097    for (builtin_item_update, ts, diff) in builtin_item_updates {
2098        let object_type = builtin_item_update.description.object_type;
2099        let update = StateUpdate {
2100            kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2101            ts,
2102            diff,
2103        };
2104        if object_type == CatalogItemType::Source {
2105            push_update(
2106                update,
2107                diff,
2108                &mut builtin_source_retractions,
2109                &mut builtin_source_additions,
2110            );
2111        } else {
2112            push_update(
2113                update,
2114                diff,
2115                &mut other_builtin_retractions,
2116                &mut other_builtin_additions,
2117            );
2118        }
2119    }
2120
2121    /// Sort items by their dependencies using topological sort.
2122    ///
2123    /// # Panics
2124    ///
2125    /// This function requires that all provided items have unique item IDs.
2126    fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2127        tracing::debug!(?items, "sorting items by dependencies");
2128
2129        let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2130        let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2131            let statement = mz_sql::parse::parse(&item.0.create_sql)
2132                .expect("valid create_sql")
2133                .into_element()
2134                .ast;
2135            mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2136        };
2137        sort_topological(items, key_fn, dependencies_fn);
2138    }
2139
2140    /// Sort item updates by dependency.
2141    ///
2142    /// First we group items into groups that are totally ordered by dependency. For example, when
2143    /// sorting all items by dependency we know that all tables can come after all sources, because
2144    /// a source can never depend on a table. Second, we sort the items in each group in
2145    /// topological order, or by ID, depending on the type.
2146    ///
2147    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2148    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2149    /// approach would be to investigate each item, discover their exact dependencies, and then
2150    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2151    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2152    /// referred to by name.
2153    ///
2154    /// The logic of this function should match [`sort_temp_item_updates`].
2155    fn sort_item_updates(
2156        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2157    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2158        // Partition items into groups s.t. each item in one group has a predefined order with all
2159        // items in other groups. For example, all sinks are ordered greater than all tables.
2160        let mut types = Vec::new();
2161        // N.B. Functions can depend on system tables, but not user tables.
2162        // TODO(udf): This will change when UDFs are supported.
2163        let mut funcs = Vec::new();
2164        let mut secrets = Vec::new();
2165        let mut connections = Vec::new();
2166        let mut sources = Vec::new();
2167        let mut tables = Vec::new();
2168        let mut derived_items = Vec::new();
2169        let mut sinks = Vec::new();
2170        let mut continual_tasks = Vec::new();
2171
2172        for update in item_updates {
2173            match update.0.item_type() {
2174                CatalogItemType::Type => types.push(update),
2175                CatalogItemType::Func => funcs.push(update),
2176                CatalogItemType::Secret => secrets.push(update),
2177                CatalogItemType::Connection => connections.push(update),
2178                CatalogItemType::Source => sources.push(update),
2179                CatalogItemType::Table => tables.push(update),
2180                CatalogItemType::View
2181                | CatalogItemType::MaterializedView
2182                | CatalogItemType::Index => derived_items.push(update),
2183                CatalogItemType::Sink => sinks.push(update),
2184                CatalogItemType::ContinualTask => continual_tasks.push(update),
2185            }
2186        }
2187
2188        // For some groups, the items in them can depend on each other and can be `ALTER`ed so that
2189        // an item ends up depending on an item with a greater ID. Thus we need to perform
2190        // topological sort for these groups.
2191        sort_items_topological(&mut connections);
2192        sort_items_topological(&mut derived_items);
2193
2194        // Other groups we can simply sort by ID.
2195        for group in [
2196            &mut types,
2197            &mut funcs,
2198            &mut secrets,
2199            &mut sources,
2200            &mut tables,
2201            &mut sinks,
2202            &mut continual_tasks,
2203        ] {
2204            group.sort_by_key(|(item, _, _)| item.id);
2205        }
2206
2207        iter::empty()
2208            .chain(types)
2209            .chain(funcs)
2210            .chain(secrets)
2211            .chain(connections)
2212            .chain(sources)
2213            .chain(tables)
2214            .chain(derived_items)
2215            .chain(sinks)
2216            .chain(continual_tasks)
2217            .collect()
2218    }
2219
2220    let item_retractions = sort_item_updates(item_retractions);
2221    let item_additions = sort_item_updates(item_additions);
2222
2223    /// Sort temporary item updates by dependency.
2224    ///
2225    /// The logic of this function should match [`sort_item_updates`].
2226    fn sort_temp_item_updates(
2227        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2228    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2229        // Partition items into groups s.t. each item in one group has a predefined order with all
2230        // items in other groups. For example, all sinks are ordered greater than all tables.
2231        let mut types = Vec::new();
2232        // N.B. Functions can depend on system tables, but not user tables.
2233        let mut funcs = Vec::new();
2234        let mut secrets = Vec::new();
2235        let mut connections = Vec::new();
2236        let mut sources = Vec::new();
2237        let mut tables = Vec::new();
2238        let mut derived_items = Vec::new();
2239        let mut sinks = Vec::new();
2240        let mut continual_tasks = Vec::new();
2241
2242        for update in temp_item_updates {
2243            match update.0.item_type() {
2244                CatalogItemType::Type => types.push(update),
2245                CatalogItemType::Func => funcs.push(update),
2246                CatalogItemType::Secret => secrets.push(update),
2247                CatalogItemType::Connection => connections.push(update),
2248                CatalogItemType::Source => sources.push(update),
2249                CatalogItemType::Table => tables.push(update),
2250                CatalogItemType::View
2251                | CatalogItemType::MaterializedView
2252                | CatalogItemType::Index => derived_items.push(update),
2253                CatalogItemType::Sink => sinks.push(update),
2254                CatalogItemType::ContinualTask => continual_tasks.push(update),
2255            }
2256        }
2257
2258        // Within each group, sort by ID.
2259        for group in [
2260            &mut types,
2261            &mut funcs,
2262            &mut secrets,
2263            &mut connections,
2264            &mut sources,
2265            &mut tables,
2266            &mut derived_items,
2267            &mut sinks,
2268            &mut continual_tasks,
2269        ] {
2270            group.sort_by_key(|(item, _, _)| item.id);
2271        }
2272
2273        iter::empty()
2274            .chain(types)
2275            .chain(funcs)
2276            .chain(secrets)
2277            .chain(connections)
2278            .chain(sources)
2279            .chain(tables)
2280            .chain(derived_items)
2281            .chain(sinks)
2282            .chain(continual_tasks)
2283            .collect()
2284    }
2285    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2286    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2287
2288    /// Merge sorted temporary and non-temp items.
2289    fn merge_item_updates(
2290        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2291        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2292    ) -> Vec<StateUpdate> {
2293        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2294
2295        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2296            (item_updates.front(), temp_item_updates.front())
2297        {
2298            if item.id < temp_item.id {
2299                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2300                state_updates.push(StateUpdate {
2301                    kind: StateUpdateKind::Item(item),
2302                    ts,
2303                    diff,
2304                });
2305            } else if item.id > temp_item.id {
2306                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2307                state_updates.push(StateUpdate {
2308                    kind: StateUpdateKind::TemporaryItem(temp_item),
2309                    ts,
2310                    diff,
2311                });
2312            } else {
2313                unreachable!(
2314                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2315                );
2316            }
2317        }
2318
2319        while let Some((item, ts, diff)) = item_updates.pop_front() {
2320            state_updates.push(StateUpdate {
2321                kind: StateUpdateKind::Item(item),
2322                ts,
2323                diff,
2324            });
2325        }
2326
2327        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2328            state_updates.push(StateUpdate {
2329                kind: StateUpdateKind::TemporaryItem(temp_item),
2330                ts,
2331                diff,
2332            });
2333        }
2334
2335        state_updates
2336    }
2337    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2338    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2339
2340    // Put everything back together.
2341    iter::empty()
2342        // All retractions must be reversed.
2343        .chain(post_item_retractions.into_iter().rev())
2344        .chain(item_retractions.into_iter().rev())
2345        .chain(other_builtin_retractions.into_iter().rev())
2346        .chain(cluster_retractions.into_iter().rev())
2347        .chain(builtin_source_retractions.into_iter().rev())
2348        .chain(pre_cluster_retractions.into_iter().rev())
2349        .chain(pre_cluster_additions)
2350        .chain(builtin_source_additions)
2351        .chain(cluster_additions)
2352        .chain(other_builtin_additions)
2353        .chain(item_additions)
2354        .chain(post_item_additions)
2355        .collect()
2356}
2357
2358/// Groups of updates of certain types are applied in batches to improve
2359/// performance. A constraint is that updates must be applied in order. This
2360/// process is modeled as a state machine that batches then applies groups of
2361/// updates.
2362enum ApplyState {
2363    /// Additions of builtin views.
2364    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2365    /// Item updates that aren't builtin view additions.
2366    ///
2367    /// This contains all updates whose application requires calling
2368    /// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2369    /// flags.
2370    Items(Vec<StateUpdate>),
2371    /// All other updates.
2372    Updates(Vec<StateUpdate>),
2373}
2374
2375impl ApplyState {
2376    fn new(update: StateUpdate) -> Self {
2377        use StateUpdateKind::*;
2378        match &update.kind {
2379            SystemObjectMapping(som)
2380                if som.description.object_type == CatalogItemType::View
2381                    && update.diff == StateDiff::Addition =>
2382            {
2383                let view_addition = lookup_builtin_view_addition(som.clone());
2384                Self::BuiltinViewAdditions(vec![view_addition])
2385            }
2386
2387            IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2388                Self::Items(vec![update])
2389            }
2390
2391            Role(_)
2392            | RoleAuth(_)
2393            | Database(_)
2394            | Schema(_)
2395            | DefaultPrivilege(_)
2396            | SystemPrivilege(_)
2397            | SystemConfiguration(_)
2398            | Cluster(_)
2399            | NetworkPolicy(_)
2400            | ClusterReplica(_)
2401            | SourceReferences(_)
2402            | Comment(_)
2403            | AuditLog(_)
2404            | StorageCollectionMetadata(_)
2405            | UnfinalizedShard(_) => Self::Updates(vec![update]),
2406        }
2407    }
2408
2409    /// Apply all updates that have been batched in `self`.
2410    ///
2411    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2412    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2413    /// details.
2414    async fn apply(
2415        self,
2416        state: &mut CatalogState,
2417        retractions: &mut InProgressRetractions,
2418        local_expression_cache: &mut LocalExpressionCache,
2419    ) -> (
2420        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2421        Vec<ParsedStateUpdate>,
2422    ) {
2423        match self {
2424            Self::BuiltinViewAdditions(builtin_view_additions) => {
2425                let restore = Arc::clone(&state.system_configuration);
2426                Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2427                let builtin_table_updates = CatalogState::parse_builtin_views(
2428                    state,
2429                    builtin_view_additions,
2430                    retractions,
2431                    local_expression_cache,
2432                )
2433                .await;
2434                state.system_configuration = restore;
2435                (builtin_table_updates, Vec::new())
2436            }
2437            Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2438                state
2439                    .apply_updates_inner(updates, retractions, local_expression_cache)
2440                    .expect("corrupt catalog")
2441            }),
2442            Self::Updates(updates) => state
2443                .apply_updates_inner(updates, retractions, local_expression_cache)
2444                .expect("corrupt catalog"),
2445        }
2446    }
2447
2448    async fn step(
2449        self,
2450        next: Self,
2451        state: &mut CatalogState,
2452        retractions: &mut InProgressRetractions,
2453        local_expression_cache: &mut LocalExpressionCache,
2454    ) -> (
2455        Self,
2456        (
2457            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2458            Vec<ParsedStateUpdate>,
2459        ),
2460    ) {
2461        match (self, next) {
2462            (
2463                Self::BuiltinViewAdditions(mut builtin_view_additions),
2464                Self::BuiltinViewAdditions(next_builtin_view_additions),
2465            ) => {
2466                // Continue batching builtin view additions.
2467                builtin_view_additions.extend(next_builtin_view_additions);
2468                (
2469                    Self::BuiltinViewAdditions(builtin_view_additions),
2470                    (Vec::new(), Vec::new()),
2471                )
2472            }
2473            (Self::Items(mut updates), Self::Items(next_updates)) => {
2474                // Continue batching item updates.
2475                updates.extend(next_updates);
2476                (Self::Items(updates), (Vec::new(), Vec::new()))
2477            }
2478            (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2479                // Continue batching updates.
2480                updates.extend(next_updates);
2481                (Self::Updates(updates), (Vec::new(), Vec::new()))
2482            }
2483            (apply_state, next_apply_state) => {
2484                // Apply the current batch and start batching new apply state.
2485                let updates = apply_state
2486                    .apply(state, retractions, local_expression_cache)
2487                    .await;
2488                (next_apply_state, updates)
2489            }
2490        }
2491    }
2492}
2493
2494/// Trait abstracting over map operations needed by [`apply_inverted_lookup`] and
2495/// [`apply_with_update`]. Both [`BTreeMap`] and [`imbl::OrdMap`] implement this.
2496trait MutableMap<K, V> {
2497    fn insert(&mut self, key: K, value: V) -> Option<V>;
2498    fn remove(&mut self, key: &K) -> Option<V>;
2499}
2500
2501impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2502    fn insert(&mut self, key: K, value: V) -> Option<V> {
2503        BTreeMap::insert(self, key, value)
2504    }
2505    fn remove(&mut self, key: &K) -> Option<V> {
2506        BTreeMap::remove(self, key)
2507    }
2508}
2509
2510impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2511    fn insert(&mut self, key: K, value: V) -> Option<V> {
2512        imbl::OrdMap::insert(self, key, value)
2513    }
2514    fn remove(&mut self, key: &K) -> Option<V> {
2515        imbl::OrdMap::remove(self, key)
2516    }
2517}
2518
2519/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2520/// generally IDs.
2521///
2522/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2523fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2524where
2525    K: Ord + Clone + Debug,
2526    V: PartialEq + Debug,
2527{
2528    match diff {
2529        StateDiff::Retraction => {
2530            let prev = map.remove(key);
2531            assert_eq!(
2532                prev,
2533                Some(value),
2534                "retraction does not match existing value: {key:?}"
2535            );
2536        }
2537        StateDiff::Addition => {
2538            let prev = map.insert(key.clone(), value);
2539            assert_eq!(
2540                prev, None,
2541                "values must be explicitly retracted before inserting a new value: {key:?}"
2542            );
2543        }
2544    }
2545}
2546
2547/// Helper method to update catalog state, that may need to be updated from a previously retracted
2548/// object.
2549fn apply_with_update<K, V, D>(
2550    map: &mut impl MutableMap<K, V>,
2551    durable: D,
2552    key_fn: impl FnOnce(&D) -> K,
2553    diff: StateDiff,
2554    retractions: &mut BTreeMap<D::Key, V>,
2555) where
2556    K: Ord,
2557    V: UpdateFrom<D> + PartialEq + Debug,
2558    D: DurableType,
2559    D::Key: Ord,
2560{
2561    match diff {
2562        StateDiff::Retraction => {
2563            let mem_key = key_fn(&durable);
2564            let value = map
2565                .remove(&mem_key)
2566                .expect("retraction does not match existing value: {key:?}");
2567            let durable_key = durable.into_key_value().0;
2568            retractions.insert(durable_key, value);
2569        }
2570        StateDiff::Addition => {
2571            let mem_key = key_fn(&durable);
2572            let durable_key = durable.key();
2573            let value = match retractions.remove(&durable_key) {
2574                Some(mut retraction) => {
2575                    retraction.update_from(durable);
2576                    retraction
2577                }
2578                None => durable.into(),
2579            };
2580            let prev = map.insert(mem_key, value);
2581            assert_eq!(
2582                prev, None,
2583                "values must be explicitly retracted before inserting a new value"
2584            );
2585        }
2586    }
2587}
2588
2589/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2590fn lookup_builtin_view_addition(
2591    mapping: SystemObjectMapping,
2592) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2593    let (_, builtin) = BUILTIN_LOOKUP
2594        .get(&mapping.description)
2595        .expect("missing builtin view");
2596    let Builtin::View(view) = builtin else {
2597        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2598    };
2599
2600    (
2601        view,
2602        mapping.unique_identifier.catalog_id,
2603        mapping.unique_identifier.global_id,
2604    )
2605}