Skip to main content

mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35    Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_compute_types::dataflows::DataflowDescription;
40use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_ore::collections::CollectionExt;
44use mz_ore::tracing::OpenTelemetryContext;
45use mz_ore::{instrument, soft_assert_eq_or_log, soft_assert_no_log, soft_assert_or_log};
46use mz_pgrepr::oid::INVALID_OID;
47use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
48use mz_repr::role_id::RoleId;
49use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
50use mz_sql::catalog::CatalogError as SqlCatalogError;
51use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
52use mz_sql::names::{
53    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
54    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
55};
56use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
57use mz_sql::session::vars::{VarError, VarInput};
58use mz_sql::{plan, rbac};
59use mz_sql_parser::ast::Expr;
60use mz_storage_types::sources::Timeline;
61use mz_transform::dataflow::DataflowMetainfo;
62use mz_transform::notice::OptimizerNotice;
63use tracing::{info_span, warn};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{BuiltinTableUpdate, CatalogState};
68use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
69use crate::util::{index_sql, sort_topological};
70
71/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
72/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
73/// results in applying a retraction for that object followed by applying an addition for that
74/// object. When applying those additions it can be extremely expensive to re-build that
75/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
76/// retractions, so it can be used during additions.
77///
78/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
79/// objects that maintain denormalized state.
80// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
81// the update step is a no-op for certain types.
82#[derive(Debug, Clone, Default)]
83struct InProgressRetractions {
84    roles: BTreeMap<RoleKey, Role>,
85    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
86    databases: BTreeMap<DatabaseKey, Database>,
87    schemas: BTreeMap<SchemaKey, Schema>,
88    clusters: BTreeMap<ClusterKey, Cluster>,
89    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
90    items: BTreeMap<ItemKey, CatalogEntry>,
91    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
92    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
93    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
94}
95
96impl CatalogState {
97    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
98    ///
99    /// Returns builtin table updates corresponding to the changes to catalog state.
100    #[must_use]
101    #[instrument]
102    pub(crate) async fn apply_updates(
103        &mut self,
104        updates: Vec<StateUpdate>,
105        local_expression_cache: &mut LocalExpressionCache,
106    ) -> (
107        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
108        Vec<ParsedStateUpdate>,
109    ) {
110        let mut builtin_table_updates = Vec::with_capacity(updates.len());
111        let mut catalog_updates = Vec::with_capacity(updates.len());
112
113        // First, consolidate updates. The code that applies parsed state
114        // updates _requires_ that the given updates are consolidated. There
115        // must be at most one addition and/or one retraction for a given item,
116        // as identified by that items ID type.
117        let updates = Self::consolidate_updates(updates);
118
119        // Apply updates in groups, according to their timestamps.
120        let mut groups: Vec<Vec<_>> = Vec::new();
121        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
122            // Bring the updates into the pseudo-topological order that we need
123            // for updating our in-memory state and generating builtin table
124            // updates.
125            let updates = sort_updates(updates.collect());
126            groups.push(updates);
127        }
128
129        for updates in groups {
130            let mut apply_state = ApplyState::Updates(Vec::new());
131            let mut retractions = InProgressRetractions::default();
132
133            for update in updates {
134                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
135                    .step(
136                        ApplyState::new(update),
137                        self,
138                        &mut retractions,
139                        local_expression_cache,
140                    )
141                    .await;
142                apply_state = next_apply_state;
143                builtin_table_updates.extend(builtin_table_update);
144                catalog_updates.extend(catalog_update);
145            }
146
147            // Apply remaining state.
148            let (builtin_table_update, catalog_update) = apply_state
149                .apply(self, &mut retractions, local_expression_cache)
150                .await;
151            builtin_table_updates.extend(builtin_table_update);
152            catalog_updates.extend(catalog_update);
153
154            // Clean up plans and optimizer notices for items that
155            // were retracted but not replaced (i.e., truly dropped).
156            let dropped_entries: Vec<CatalogEntry> = retractions
157                .items
158                .into_values()
159                .chain(retractions.temp_items.into_values())
160                .collect();
161            if !dropped_entries.is_empty() {
162                let dropped_notices = self.drop_optimizer_notices(dropped_entries);
163                if self.system_config().enable_mz_notices() {
164                    self.pack_optimizer_notice_updates(
165                        &mut builtin_table_updates,
166                        dropped_notices.iter(),
167                        Diff::MINUS_ONE,
168                    );
169                }
170            }
171        }
172
173        (builtin_table_updates, catalog_updates)
174    }
175
176    /// It can happen that the sequencing logic creates "fluctuating" updates
177    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
178    /// for a table, there will be a retraction of the original table state,
179    /// then an addition for the same table but stripped of some of the roles
180    /// and access things, and then a retraction for that intermediate table
181    /// state. By consolidating, the intermediate state addition/retraction will
182    /// cancel out and we'll only see the retraction for the original state.
183    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
184        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
185            .into_iter()
186            .map(|update| (update.kind, update.ts, update.diff.into()))
187            .collect_vec();
188
189        consolidate_updates(&mut updates);
190
191        updates
192            .into_iter()
193            .map(|(kind, ts, diff)| StateUpdate {
194                kind,
195                ts,
196                diff: diff
197                    .try_into()
198                    .expect("catalog state cannot have diff other than -1 or 1"),
199            })
200            .collect_vec()
201    }
202
203    #[instrument(level = "debug")]
204    fn apply_updates_inner(
205        &mut self,
206        updates: Vec<StateUpdate>,
207        retractions: &mut InProgressRetractions,
208        local_expression_cache: &mut LocalExpressionCache,
209    ) -> Result<
210        (
211            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
212            Vec<ParsedStateUpdate>,
213        ),
214        CatalogError,
215    > {
216        soft_assert_no_log!(
217            updates.iter().map(|update| update.ts).all_equal(),
218            "all timestamps should be equal: {updates:?}"
219        );
220
221        let mut update_system_config = false;
222
223        let mut builtin_table_updates = Vec::with_capacity(updates.len());
224        let mut catalog_updates = Vec::new();
225
226        for state_update in updates {
227            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
228                update_system_config = true;
229            }
230
231            match state_update.diff {
232                StateDiff::Retraction => {
233                    // We want the parsed catalog updates to match the state of
234                    // the catalog _before_ applying a retraction. So that we can
235                    // still have useful in-memory state to work with.
236                    if let Some(update) =
237                        parsed_state_updates::parse_state_update(self, state_update.clone())
238                    {
239                        catalog_updates.push(update);
240                    }
241
242                    // We want the builtin table retraction to match the state of the catalog
243                    // before applying the update.
244                    builtin_table_updates.extend(self.generate_builtin_table_update(
245                        state_update.kind.clone(),
246                        state_update.diff,
247                    ));
248                    self.apply_update(
249                        state_update.kind,
250                        state_update.diff,
251                        retractions,
252                        local_expression_cache,
253                    )?;
254                }
255                StateDiff::Addition => {
256                    self.apply_update(
257                        state_update.kind.clone(),
258                        state_update.diff,
259                        retractions,
260                        local_expression_cache,
261                    )?;
262                    // We want the builtin table addition to match the state of
263                    // the catalog after applying the update. So that we already
264                    // have useful in-memory state to work with.
265                    builtin_table_updates.extend(self.generate_builtin_table_update(
266                        state_update.kind.clone(),
267                        state_update.diff,
268                    ));
269
270                    // We want the parsed catalog updates to match the state of
271                    // the catalog _after_ applying an addition.
272                    if let Some(update) =
273                        parsed_state_updates::parse_state_update(self, state_update.clone())
274                    {
275                        catalog_updates.push(update);
276                    }
277                }
278            }
279        }
280
281        if update_system_config {
282            self.system_configuration.dyncfg_updates();
283        }
284
285        Ok((builtin_table_updates, catalog_updates))
286    }
287
288    #[instrument(level = "debug")]
289    fn apply_update(
290        &mut self,
291        kind: StateUpdateKind,
292        diff: StateDiff,
293        retractions: &mut InProgressRetractions,
294        local_expression_cache: &mut LocalExpressionCache,
295    ) -> Result<(), CatalogError> {
296        match kind {
297            StateUpdateKind::Role(role) => {
298                self.apply_role_update(role, diff, retractions);
299            }
300            StateUpdateKind::RoleAuth(role_auth) => {
301                self.apply_role_auth_update(role_auth, diff, retractions);
302            }
303            StateUpdateKind::Database(database) => {
304                self.apply_database_update(database, diff, retractions);
305            }
306            StateUpdateKind::Schema(schema) => {
307                self.apply_schema_update(schema, diff, retractions);
308            }
309            StateUpdateKind::DefaultPrivilege(default_privilege) => {
310                self.apply_default_privilege_update(default_privilege, diff, retractions);
311            }
312            StateUpdateKind::SystemPrivilege(system_privilege) => {
313                self.apply_system_privilege_update(system_privilege, diff, retractions);
314            }
315            StateUpdateKind::SystemConfiguration(system_configuration) => {
316                self.apply_system_configuration_update(system_configuration, diff, retractions);
317            }
318            StateUpdateKind::Cluster(cluster) => {
319                self.apply_cluster_update(cluster, diff, retractions);
320            }
321            StateUpdateKind::NetworkPolicy(network_policy) => {
322                self.apply_network_policy_update(network_policy, diff, retractions);
323            }
324            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
325                self.apply_introspection_source_index_update(
326                    introspection_source_index,
327                    diff,
328                    retractions,
329                );
330            }
331            StateUpdateKind::ClusterReplica(cluster_replica) => {
332                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
333            }
334            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
335                self.apply_system_object_mapping_update(
336                    system_object_mapping,
337                    diff,
338                    retractions,
339                    local_expression_cache,
340                );
341            }
342            StateUpdateKind::TemporaryItem(item) => {
343                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
344            }
345            StateUpdateKind::Item(item) => {
346                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
347            }
348            StateUpdateKind::Comment(comment) => {
349                self.apply_comment_update(comment, diff, retractions);
350            }
351            StateUpdateKind::SourceReferences(source_reference) => {
352                self.apply_source_references_update(source_reference, diff, retractions);
353            }
354            StateUpdateKind::AuditLog(_audit_log) => {
355                // Audit logs are not stored in-memory.
356            }
357            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
358                self.apply_storage_collection_metadata_update(
359                    storage_collection_metadata,
360                    diff,
361                    retractions,
362                );
363            }
364            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
365                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
366            }
367        }
368
369        Ok(())
370    }
371
372    #[instrument(level = "debug")]
373    fn apply_role_auth_update(
374        &mut self,
375        role_auth: mz_catalog::durable::RoleAuth,
376        diff: StateDiff,
377        retractions: &mut InProgressRetractions,
378    ) {
379        apply_with_update(
380            &mut self.role_auth_by_id,
381            role_auth,
382            |role_auth| role_auth.role_id,
383            diff,
384            &mut retractions.role_auths,
385        );
386    }
387
388    #[instrument(level = "debug")]
389    fn apply_role_update(
390        &mut self,
391        role: mz_catalog::durable::Role,
392        diff: StateDiff,
393        retractions: &mut InProgressRetractions,
394    ) {
395        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
396        apply_with_update(
397            &mut self.roles_by_id,
398            role,
399            |role| role.id,
400            diff,
401            &mut retractions.roles,
402        );
403    }
404
405    #[instrument(level = "debug")]
406    fn apply_database_update(
407        &mut self,
408        database: mz_catalog::durable::Database,
409        diff: StateDiff,
410        retractions: &mut InProgressRetractions,
411    ) {
412        apply_inverted_lookup(
413            &mut self.database_by_name,
414            &database.name,
415            database.id,
416            diff,
417        );
418        apply_with_update(
419            &mut self.database_by_id,
420            database,
421            |database| database.id,
422            diff,
423            &mut retractions.databases,
424        );
425    }
426
427    #[instrument(level = "debug")]
428    fn apply_schema_update(
429        &mut self,
430        schema: mz_catalog::durable::Schema,
431        diff: StateDiff,
432        retractions: &mut InProgressRetractions,
433    ) {
434        match &schema.database_id {
435            Some(database_id) => {
436                let db = self
437                    .database_by_id
438                    .get_mut(database_id)
439                    .expect("catalog out of sync");
440                apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
441                apply_with_update(
442                    &mut db.schemas_by_id,
443                    schema,
444                    |schema| schema.id,
445                    diff,
446                    &mut retractions.schemas,
447                );
448            }
449            None => {
450                apply_inverted_lookup(
451                    &mut self.ambient_schemas_by_name,
452                    &schema.name,
453                    schema.id,
454                    diff,
455                );
456                apply_with_update(
457                    &mut self.ambient_schemas_by_id,
458                    schema,
459                    |schema| schema.id,
460                    diff,
461                    &mut retractions.schemas,
462                );
463            }
464        }
465    }
466
467    #[instrument(level = "debug")]
468    fn apply_default_privilege_update(
469        &mut self,
470        default_privilege: mz_catalog::durable::DefaultPrivilege,
471        diff: StateDiff,
472        _retractions: &mut InProgressRetractions,
473    ) {
474        match diff {
475            StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
476                .grant(default_privilege.object, default_privilege.acl_item),
477            StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
478                .revoke(&default_privilege.object, &default_privilege.acl_item),
479        }
480    }
481
482    #[instrument(level = "debug")]
483    fn apply_system_privilege_update(
484        &mut self,
485        system_privilege: MzAclItem,
486        diff: StateDiff,
487        _retractions: &mut InProgressRetractions,
488    ) {
489        match diff {
490            StateDiff::Addition => {
491                Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
492            }
493            StateDiff::Retraction => {
494                Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
495            }
496        }
497    }
498
499    #[instrument(level = "debug")]
500    fn apply_system_configuration_update(
501        &mut self,
502        system_configuration: mz_catalog::durable::SystemConfiguration,
503        diff: StateDiff,
504        _retractions: &mut InProgressRetractions,
505    ) {
506        let res = match diff {
507            StateDiff::Addition => self.insert_system_configuration(
508                &system_configuration.name,
509                VarInput::Flat(&system_configuration.value),
510            ),
511            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
512        };
513        match res {
514            Ok(_) => (),
515            // When system variables are deleted, nothing deletes them from the underlying
516            // durable catalog, which isn't great. Still, we need to be able to ignore
517            // unknown variables.
518            Err(Error {
519                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
520            }) => {
521                warn!(%name, "unknown system parameter from catalog storage");
522            }
523            Err(e) => panic!("unable to update system variable: {e:?}"),
524        }
525    }
526
527    #[instrument(level = "debug")]
528    fn apply_cluster_update(
529        &mut self,
530        cluster: mz_catalog::durable::Cluster,
531        diff: StateDiff,
532        retractions: &mut InProgressRetractions,
533    ) {
534        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
535        apply_with_update(
536            &mut self.clusters_by_id,
537            cluster,
538            |cluster| cluster.id,
539            diff,
540            &mut retractions.clusters,
541        );
542    }
543
544    #[instrument(level = "debug")]
545    fn apply_network_policy_update(
546        &mut self,
547        policy: mz_catalog::durable::NetworkPolicy,
548        diff: StateDiff,
549        retractions: &mut InProgressRetractions,
550    ) {
551        apply_inverted_lookup(
552            &mut self.network_policies_by_name,
553            &policy.name,
554            policy.id,
555            diff,
556        );
557        apply_with_update(
558            &mut self.network_policies_by_id,
559            policy,
560            |policy| policy.id,
561            diff,
562            &mut retractions.network_policies,
563        );
564    }
565
566    #[instrument(level = "debug")]
567    fn apply_introspection_source_index_update(
568        &mut self,
569        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
570        diff: StateDiff,
571        retractions: &mut InProgressRetractions,
572    ) {
573        let cluster = self
574            .clusters_by_id
575            .get_mut(&introspection_source_index.cluster_id)
576            .expect("catalog out of sync");
577        let log = BUILTIN_LOG_LOOKUP
578            .get(introspection_source_index.name.as_str())
579            .expect("missing log");
580        apply_inverted_lookup(
581            &mut cluster.log_indexes,
582            &log.variant,
583            introspection_source_index.index_id,
584            diff,
585        );
586
587        match diff {
588            StateDiff::Addition => {
589                if let Some(mut entry) = retractions
590                    .introspection_source_indexes
591                    .remove(&introspection_source_index.item_id)
592                {
593                    // This should only happen during startup as a result of builtin migrations. We
594                    // create a new index item and replace the old one with it.
595                    let (index_name, index) = self.create_introspection_source_index(
596                        introspection_source_index.cluster_id,
597                        log,
598                        introspection_source_index.index_id,
599                    );
600                    assert_eq!(entry.id, introspection_source_index.item_id);
601                    assert_eq!(entry.oid, introspection_source_index.oid);
602                    assert_eq!(entry.name, index_name);
603                    entry.item = index;
604                    self.insert_entry(entry);
605                } else {
606                    self.insert_introspection_source_index(
607                        introspection_source_index.cluster_id,
608                        log,
609                        introspection_source_index.item_id,
610                        introspection_source_index.index_id,
611                        introspection_source_index.oid,
612                    );
613                }
614            }
615            StateDiff::Retraction => {
616                let entry = self.drop_item(introspection_source_index.item_id);
617                retractions
618                    .introspection_source_indexes
619                    .insert(entry.id, entry);
620            }
621        }
622    }
623
624    #[instrument(level = "debug")]
625    fn apply_cluster_replica_update(
626        &mut self,
627        cluster_replica: mz_catalog::durable::ClusterReplica,
628        diff: StateDiff,
629        _retractions: &mut InProgressRetractions,
630    ) {
631        let cluster = self
632            .clusters_by_id
633            .get(&cluster_replica.cluster_id)
634            .expect("catalog out of sync");
635        let azs = cluster.availability_zones();
636        let location = self
637            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
638            .expect("catalog in unexpected state");
639        let cluster = self
640            .clusters_by_id
641            .get_mut(&cluster_replica.cluster_id)
642            .expect("catalog out of sync");
643        apply_inverted_lookup(
644            &mut cluster.replica_id_by_name_,
645            &cluster_replica.name,
646            cluster_replica.replica_id,
647            diff,
648        );
649        match diff {
650            StateDiff::Retraction => {
651                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
652                assert!(
653                    prev.is_some(),
654                    "retraction does not match existing value: {:?}",
655                    cluster_replica.replica_id
656                );
657            }
658            StateDiff::Addition => {
659                let logging = ReplicaLogging {
660                    log_logging: cluster_replica.config.logging.log_logging,
661                    interval: cluster_replica.config.logging.interval,
662                };
663                let config = ReplicaConfig {
664                    location,
665                    compute: ComputeReplicaConfig { logging },
666                };
667                let mem_cluster_replica = ClusterReplica {
668                    name: cluster_replica.name.clone(),
669                    cluster_id: cluster_replica.cluster_id,
670                    replica_id: cluster_replica.replica_id,
671                    config,
672                    owner_id: cluster_replica.owner_id,
673                };
674                let prev = cluster
675                    .replicas_by_id_
676                    .insert(cluster_replica.replica_id, mem_cluster_replica);
677                assert_eq!(
678                    prev, None,
679                    "values must be explicitly retracted before inserting a new value: {:?}",
680                    cluster_replica.replica_id
681                );
682            }
683        }
684    }
685
686    #[instrument(level = "debug")]
687    fn apply_system_object_mapping_update(
688        &mut self,
689        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
690        diff: StateDiff,
691        retractions: &mut InProgressRetractions,
692        local_expression_cache: &mut LocalExpressionCache,
693    ) {
694        let item_id = system_object_mapping.unique_identifier.catalog_id;
695        let global_id = system_object_mapping.unique_identifier.global_id;
696
697        if system_object_mapping.unique_identifier.runtime_alterable() {
698            // Runtime-alterable system objects have real entries in the items
699            // collection and so get handled through the normal `insert_item`
700            // and `drop_item` code paths.
701            return;
702        }
703
704        if let StateDiff::Retraction = diff {
705            let entry = self.drop_item(item_id);
706            retractions.system_object_mappings.insert(item_id, entry);
707            return;
708        }
709
710        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
711            // This implies that we updated the fingerprint for some builtin item. The retraction
712            // was parsed, planned, and optimized using the compiled in definition, not the
713            // definition from a previous version. So we can just stick the old entry back into the
714            // catalog.
715            self.insert_entry(entry);
716            return;
717        }
718
719        let builtin = BUILTIN_LOOKUP
720            .get(&system_object_mapping.description)
721            .expect("missing builtin")
722            .1;
723        let schema_name = builtin.schema();
724        let schema_id = self
725            .ambient_schemas_by_name
726            .get(schema_name)
727            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
728        let name = QualifiedItemName {
729            qualifiers: ItemQualifiers {
730                database_spec: ResolvedDatabaseSpecifier::Ambient,
731                schema_spec: SchemaSpecifier::Id(*schema_id),
732            },
733            item: builtin.name().into(),
734        };
735        match builtin {
736            Builtin::Log(log) => {
737                let mut acl_items = vec![rbac::owner_privilege(
738                    mz_sql::catalog::ObjectType::Source,
739                    MZ_SYSTEM_ROLE_ID,
740                )];
741                acl_items.extend_from_slice(&log.access);
742                self.insert_item(
743                    item_id,
744                    log.oid,
745                    name.clone(),
746                    CatalogItem::Log(Log {
747                        variant: log.variant,
748                        global_id,
749                    }),
750                    MZ_SYSTEM_ROLE_ID,
751                    PrivilegeMap::from_mz_acl_items(acl_items),
752                );
753            }
754
755            Builtin::Table(table) => {
756                let mut acl_items = vec![rbac::owner_privilege(
757                    mz_sql::catalog::ObjectType::Table,
758                    MZ_SYSTEM_ROLE_ID,
759                )];
760                acl_items.extend_from_slice(&table.access);
761
762                self.insert_item(
763                    item_id,
764                    table.oid,
765                    name.clone(),
766                    CatalogItem::Table(Table {
767                        create_sql: None,
768                        desc: VersionedRelationDesc::new(table.desc.clone()),
769                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
770                        conn_id: None,
771                        resolved_ids: ResolvedIds::empty(),
772                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
773                            || {
774                                self.system_config()
775                                    .metrics_retention()
776                                    .try_into()
777                                    .expect("invalid metrics retention")
778                            },
779                        ),
780                        is_retained_metrics_object: table.is_retained_metrics_object,
781                        data_source: TableDataSource::TableWrites {
782                            defaults: vec![Expr::null(); table.desc.arity()],
783                        },
784                    }),
785                    MZ_SYSTEM_ROLE_ID,
786                    PrivilegeMap::from_mz_acl_items(acl_items),
787                );
788            }
789            Builtin::Index(index) => {
790                let custom_logical_compaction_window =
791                    index.is_retained_metrics_object.then(|| {
792                        self.system_config()
793                            .metrics_retention()
794                            .try_into()
795                            .expect("invalid metrics retention")
796                    });
797                // Indexes can't be versioned.
798                let versions = BTreeMap::new();
799
800                let item = self
801                    .parse_item(
802                        global_id,
803                        &index.create_sql(),
804                        &versions,
805                        None,
806                        index.is_retained_metrics_object,
807                        custom_logical_compaction_window,
808                        local_expression_cache,
809                        None,
810                    )
811                    .unwrap_or_else(|e| {
812                        panic!(
813                            "internal error: failed to load bootstrap index:\n\
814                                    {}\n\
815                                    error:\n\
816                                    {:?}\n\n\
817                                    make sure that the schema name is specified in the builtin index's create sql statement.",
818                            index.name, e
819                        )
820                    });
821                let CatalogItem::Index(_) = item else {
822                    panic!(
823                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
824                        index.name
825                    );
826                };
827
828                self.insert_item(
829                    item_id,
830                    index.oid,
831                    name,
832                    item,
833                    MZ_SYSTEM_ROLE_ID,
834                    PrivilegeMap::default(),
835                );
836            }
837            Builtin::View(_) => {
838                // parse_views is responsible for inserting all builtin views.
839                unreachable!("views added elsewhere");
840            }
841
842            // Note: Element types must be loaded before array types.
843            Builtin::Type(typ) => {
844                let typ = self.resolve_builtin_type_references(typ);
845                if let CatalogType::Array { element_reference } = typ.details.typ {
846                    let entry = self.get_entry_mut(&element_reference);
847                    let item_type = match &mut entry.item {
848                        CatalogItem::Type(item_type) => item_type,
849                        _ => unreachable!("types can only reference other types"),
850                    };
851                    item_type.details.array_id = Some(item_id);
852                }
853
854                let schema_id = self.resolve_system_schema(typ.schema);
855
856                self.insert_item(
857                    item_id,
858                    typ.oid,
859                    QualifiedItemName {
860                        qualifiers: ItemQualifiers {
861                            database_spec: ResolvedDatabaseSpecifier::Ambient,
862                            schema_spec: SchemaSpecifier::Id(schema_id),
863                        },
864                        item: typ.name.to_owned(),
865                    },
866                    CatalogItem::Type(Type {
867                        create_sql: None,
868                        global_id,
869                        details: typ.details.clone(),
870                        resolved_ids: ResolvedIds::empty(),
871                    }),
872                    MZ_SYSTEM_ROLE_ID,
873                    PrivilegeMap::from_mz_acl_items(vec![
874                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
875                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
876                    ]),
877                );
878            }
879
880            Builtin::Func(func) => {
881                // This OID is never used. `func` has a `Vec` of implementations and
882                // each implementation has its own OID. Those are the OIDs that are
883                // actually used by the system.
884                let oid = INVALID_OID;
885                self.insert_item(
886                    item_id,
887                    oid,
888                    name.clone(),
889                    CatalogItem::Func(Func {
890                        inner: func.inner,
891                        global_id,
892                    }),
893                    MZ_SYSTEM_ROLE_ID,
894                    PrivilegeMap::default(),
895                );
896            }
897
898            Builtin::Source(coll) => {
899                let mut acl_items = vec![rbac::owner_privilege(
900                    mz_sql::catalog::ObjectType::Source,
901                    MZ_SYSTEM_ROLE_ID,
902                )];
903                acl_items.extend_from_slice(&coll.access);
904
905                self.insert_item(
906                    item_id,
907                    coll.oid,
908                    name.clone(),
909                    CatalogItem::Source(Source {
910                        create_sql: None,
911                        data_source: coll.data_source.clone(),
912                        desc: coll.desc.clone(),
913                        global_id,
914                        timeline: Timeline::EpochMilliseconds,
915                        resolved_ids: ResolvedIds::empty(),
916                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
917                            || {
918                                self.system_config()
919                                    .metrics_retention()
920                                    .try_into()
921                                    .expect("invalid metrics retention")
922                            },
923                        ),
924                        is_retained_metrics_object: coll.is_retained_metrics_object,
925                    }),
926                    MZ_SYSTEM_ROLE_ID,
927                    PrivilegeMap::from_mz_acl_items(acl_items),
928                );
929            }
930            Builtin::MaterializedView(mv) => {
931                let mut acl_items = vec![rbac::owner_privilege(
932                    mz_sql::catalog::ObjectType::MaterializedView,
933                    MZ_SYSTEM_ROLE_ID,
934                )];
935                acl_items.extend_from_slice(&mv.access);
936
937                let custom_logical_compaction_window = mv.is_retained_metrics_object.then(|| {
938                    self.system_config()
939                        .metrics_retention()
940                        .try_into()
941                        .expect("invalid metrics retention")
942                });
943
944                // Builtin materialized views can't be versioned.
945                let versions = BTreeMap::new();
946
947                let mut item = self
948                    .parse_item(
949                        global_id,
950                        &mv.create_sql(),
951                        &versions,
952                        None,
953                        mv.is_retained_metrics_object,
954                        custom_logical_compaction_window,
955                        local_expression_cache,
956                        None,
957                    )
958                    .unwrap_or_else(|e| {
959                        panic!(
960                            "internal error: failed to load bootstrap materialized view:\n\
961                             {}\n\
962                             error:\n\
963                             {e:?}\n\n\
964                             make sure that the schema name is specified in the builtin \
965                             materialized view's create sql statement.",
966                            mv.name,
967                        )
968                    });
969                let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
970                    panic!(
971                        "internal error: builtin materialized view {}'s SQL does not begin \
972                         with \"CREATE MATERIALIZED VIEW\".",
973                        mv.name,
974                    );
975                };
976
977                // The optimizer can only infer keys from MV definitions, but cannot infer
978                // uniqueness present in the input data. Extend with the keys declared in the
979                // builtin definition, to allow supplying additional key knowledge.
980                let mut desc = catalog_mv.desc.latest();
981                for key in &mv.desc.typ().keys {
982                    desc = desc.with_key(key.clone());
983                }
984                catalog_mv.desc = VersionedRelationDesc::new(desc);
985
986                self.insert_item(
987                    item_id,
988                    mv.oid,
989                    name,
990                    item,
991                    MZ_SYSTEM_ROLE_ID,
992                    PrivilegeMap::from_mz_acl_items(acl_items),
993                );
994            }
995            Builtin::Connection(connection) => {
996                // Connections can't be versioned.
997                let versions = BTreeMap::new();
998                let mut item = self
999                    .parse_item(
1000                        global_id,
1001                        connection.sql,
1002                        &versions,
1003                        None,
1004                        false,
1005                        None,
1006                        local_expression_cache,
1007                        None,
1008                    )
1009                    .unwrap_or_else(|e| {
1010                        panic!(
1011                            "internal error: failed to load bootstrap connection:\n\
1012                                    {}\n\
1013                                    error:\n\
1014                                    {:?}\n\n\
1015                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
1016                            connection.name, e
1017                        )
1018                    });
1019                let CatalogItem::Connection(_) = &mut item else {
1020                    panic!(
1021                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1022                        connection.name
1023                    );
1024                };
1025
1026                let mut acl_items = vec![rbac::owner_privilege(
1027                    mz_sql::catalog::ObjectType::Connection,
1028                    connection.owner_id.clone(),
1029                )];
1030                acl_items.extend_from_slice(connection.access);
1031
1032                self.insert_item(
1033                    item_id,
1034                    connection.oid,
1035                    name.clone(),
1036                    item,
1037                    connection.owner_id.clone(),
1038                    PrivilegeMap::from_mz_acl_items(acl_items),
1039                );
1040            }
1041        }
1042    }
1043
1044    #[instrument(level = "debug")]
1045    fn apply_temporary_item_update(
1046        &mut self,
1047        temporary_item: TemporaryItem,
1048        diff: StateDiff,
1049        retractions: &mut InProgressRetractions,
1050        local_expression_cache: &mut LocalExpressionCache,
1051    ) {
1052        match diff {
1053            StateDiff::Addition => {
1054                let TemporaryItem {
1055                    id,
1056                    oid,
1057                    global_id,
1058                    schema_id,
1059                    name,
1060                    conn_id,
1061                    create_sql,
1062                    owner_id,
1063                    privileges,
1064                    extra_versions,
1065                } = temporary_item;
1066                // Lazily create the temporary schema if it doesn't exist yet.
1067                // We need the conn_id to create the schema, and it should always be Some for temp items.
1068                let temp_conn_id = conn_id
1069                    .as_ref()
1070                    .expect("temporary items must have a connection id");
1071                if !self.temporary_schemas.contains_key(temp_conn_id) {
1072                    self.create_temporary_schema(temp_conn_id, owner_id)
1073                        .expect("failed to create temporary schema");
1074                }
1075                let schema = self.find_temp_schema(&schema_id);
1076                let name = QualifiedItemName {
1077                    qualifiers: ItemQualifiers {
1078                        database_spec: schema.database().clone(),
1079                        schema_spec: schema.id().clone(),
1080                    },
1081                    item: name.clone(),
1082                };
1083
1084                let entry = match retractions.temp_items.remove(&id) {
1085                    Some(mut retraction) => {
1086                        assert_eq!(retraction.id, id);
1087
1088                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1089                        // item. This is a performance optimization and not needed for correctness.
1090                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1091                        // is still the same as the trait.
1092                        if retraction.create_sql() != create_sql {
1093                            let mut catalog_item = self
1094                                .deserialize_item(
1095                                    global_id,
1096                                    &create_sql,
1097                                    &extra_versions,
1098                                    local_expression_cache,
1099                                    Some(retraction.item),
1100                                )
1101                                .unwrap_or_else(|e| {
1102                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1103                                });
1104                            // Have to patch up the item because parsing doesn't
1105                            // take into account temporary schemas/conn_id.
1106                            // NOTE(aljoscha): I don't like how we're patching
1107                            // this in here, but it's but one of the ways in
1108                            // which temporary items are a bit weird. So, here
1109                            // we are ...
1110                            catalog_item.set_conn_id(conn_id);
1111                            retraction.item = catalog_item;
1112                        }
1113
1114                        retraction.id = id;
1115                        retraction.oid = oid;
1116                        retraction.name = name;
1117                        retraction.owner_id = owner_id;
1118                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1119                        retraction
1120                    }
1121                    None => {
1122                        let mut catalog_item = self
1123                            .deserialize_item(
1124                                global_id,
1125                                &create_sql,
1126                                &extra_versions,
1127                                local_expression_cache,
1128                                None,
1129                            )
1130                            .unwrap_or_else(|e| {
1131                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1132                            });
1133
1134                        // Have to patch up the item because parsing doesn't
1135                        // take into account temporary schemas/conn_id.
1136                        // NOTE(aljoscha): I don't like how we're patching this
1137                        // in here, but it's but one of the ways in which
1138                        // temporary items are a bit weird. So, here we are ...
1139                        catalog_item.set_conn_id(conn_id);
1140
1141                        CatalogEntry {
1142                            item: catalog_item,
1143                            referenced_by: Vec::new(),
1144                            used_by: Vec::new(),
1145                            id,
1146                            oid,
1147                            name,
1148                            owner_id,
1149                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1150                        }
1151                    }
1152                };
1153                self.insert_entry(entry);
1154            }
1155            StateDiff::Retraction => {
1156                let entry = self.drop_item(temporary_item.id);
1157                retractions.temp_items.insert(temporary_item.id, entry);
1158            }
1159        }
1160    }
1161
1162    #[instrument(level = "debug")]
1163    fn apply_item_update(
1164        &mut self,
1165        item: mz_catalog::durable::Item,
1166        diff: StateDiff,
1167        retractions: &mut InProgressRetractions,
1168        local_expression_cache: &mut LocalExpressionCache,
1169    ) -> Result<(), CatalogError> {
1170        match diff {
1171            StateDiff::Addition => {
1172                let key = item.key();
1173                let mz_catalog::durable::Item {
1174                    id,
1175                    oid,
1176                    global_id,
1177                    schema_id,
1178                    name,
1179                    create_sql,
1180                    owner_id,
1181                    privileges,
1182                    extra_versions,
1183                } = item;
1184                let schema = self.find_non_temp_schema(&schema_id);
1185                let name = QualifiedItemName {
1186                    qualifiers: ItemQualifiers {
1187                        database_spec: schema.database().clone(),
1188                        schema_spec: schema.id().clone(),
1189                    },
1190                    item: name.clone(),
1191                };
1192                let entry = match retractions.items.remove(&key) {
1193                    Some(retraction) => {
1194                        assert_eq!(retraction.id, item.id);
1195
1196                        let item = self
1197                            .deserialize_item(
1198                                global_id,
1199                                &create_sql,
1200                                &extra_versions,
1201                                local_expression_cache,
1202                                Some(retraction.item),
1203                            )
1204                            .unwrap_or_else(|e| {
1205                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1206                            });
1207
1208                        CatalogEntry {
1209                            item,
1210                            id,
1211                            oid,
1212                            name,
1213                            owner_id,
1214                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1215                            referenced_by: retraction.referenced_by,
1216                            used_by: retraction.used_by,
1217                        }
1218                    }
1219                    None => {
1220                        let catalog_item = self
1221                            .deserialize_item(
1222                                global_id,
1223                                &create_sql,
1224                                &extra_versions,
1225                                local_expression_cache,
1226                                None,
1227                            )
1228                            .unwrap_or_else(|e| {
1229                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1230                            });
1231                        CatalogEntry {
1232                            item: catalog_item,
1233                            referenced_by: Vec::new(),
1234                            used_by: Vec::new(),
1235                            id,
1236                            oid,
1237                            name,
1238                            owner_id,
1239                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1240                        }
1241                    }
1242                };
1243
1244                self.insert_entry(entry);
1245            }
1246            StateDiff::Retraction => {
1247                let entry = self.drop_item(item.id);
1248                let key = item.into_key_value().0;
1249                retractions.items.insert(key, entry);
1250            }
1251        }
1252        Ok(())
1253    }
1254
1255    #[instrument(level = "debug")]
1256    fn apply_comment_update(
1257        &mut self,
1258        comment: mz_catalog::durable::Comment,
1259        diff: StateDiff,
1260        _retractions: &mut InProgressRetractions,
1261    ) {
1262        match diff {
1263            StateDiff::Addition => {
1264                let prev = Arc::make_mut(&mut self.comments).update_comment(
1265                    comment.object_id,
1266                    comment.sub_component,
1267                    Some(comment.comment),
1268                );
1269                assert_eq!(
1270                    prev, None,
1271                    "values must be explicitly retracted before inserting a new value"
1272                );
1273            }
1274            StateDiff::Retraction => {
1275                let prev = Arc::make_mut(&mut self.comments).update_comment(
1276                    comment.object_id,
1277                    comment.sub_component,
1278                    None,
1279                );
1280                assert_eq!(
1281                    prev,
1282                    Some(comment.comment),
1283                    "retraction does not match existing value: ({:?}, {:?})",
1284                    comment.object_id,
1285                    comment.sub_component,
1286                );
1287            }
1288        }
1289    }
1290
1291    #[instrument(level = "debug")]
1292    fn apply_source_references_update(
1293        &mut self,
1294        source_references: mz_catalog::durable::SourceReferences,
1295        diff: StateDiff,
1296        _retractions: &mut InProgressRetractions,
1297    ) {
1298        match diff {
1299            StateDiff::Addition => {
1300                let prev = self
1301                    .source_references
1302                    .insert(source_references.source_id, source_references.into());
1303                assert!(
1304                    prev.is_none(),
1305                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1306                );
1307            }
1308            StateDiff::Retraction => {
1309                let prev = self.source_references.remove(&source_references.source_id);
1310                assert!(
1311                    prev.is_some(),
1312                    "retraction for a non-existent existing value: {source_references:?}"
1313                );
1314            }
1315        }
1316    }
1317
1318    #[instrument(level = "debug")]
1319    fn apply_storage_collection_metadata_update(
1320        &mut self,
1321        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1322        diff: StateDiff,
1323        _retractions: &mut InProgressRetractions,
1324    ) {
1325        apply_inverted_lookup(
1326            &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1327            &storage_collection_metadata.id,
1328            storage_collection_metadata.shard,
1329            diff,
1330        );
1331    }
1332
1333    #[instrument(level = "debug")]
1334    fn apply_unfinalized_shard_update(
1335        &mut self,
1336        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1337        diff: StateDiff,
1338        _retractions: &mut InProgressRetractions,
1339    ) {
1340        match diff {
1341            StateDiff::Addition => {
1342                let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1343                    .unfinalized_shards
1344                    .insert(unfinalized_shard.shard);
1345                assert!(
1346                    newly_inserted,
1347                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1348                );
1349            }
1350            StateDiff::Retraction => {
1351                let removed = Arc::make_mut(&mut self.storage_metadata)
1352                    .unfinalized_shards
1353                    .remove(&unfinalized_shard.shard);
1354                assert!(
1355                    removed,
1356                    "retraction does not match existing value: {unfinalized_shard:?}"
1357                );
1358            }
1359        }
1360    }
1361
1362    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1363    /// durable catalog.
1364    #[instrument]
1365    pub(crate) fn generate_builtin_table_updates(
1366        &self,
1367        updates: Vec<StateUpdate>,
1368    ) -> Vec<BuiltinTableUpdate> {
1369        let mut builtin_table_updates = Vec::new();
1370        for StateUpdate { kind, ts: _, diff } in updates {
1371            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1372            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1373            builtin_table_updates.extend(builtin_table_update);
1374        }
1375        builtin_table_updates
1376    }
1377
1378    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1379    /// durable catalog.
1380    #[instrument(level = "debug")]
1381    pub(crate) fn generate_builtin_table_update(
1382        &self,
1383        kind: StateUpdateKind,
1384        diff: StateDiff,
1385    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1386        let diff = diff.into();
1387        match kind {
1388            StateUpdateKind::Role(role) => self.pack_role_update(role.id, diff),
1389            StateUpdateKind::RoleAuth(role_auth) => {
1390                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1391            }
1392            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1393                vec![self.pack_default_privileges_update(
1394                    &default_privilege.object,
1395                    &default_privilege.acl_item.grantee,
1396                    &default_privilege.acl_item.acl_mode,
1397                    diff,
1398                )]
1399            }
1400            StateUpdateKind::SystemPrivilege(system_privilege) => {
1401                vec![self.pack_system_privileges_update(system_privilege, diff)]
1402            }
1403            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1404            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1405            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1406                self.pack_item_update(introspection_source_index.item_id, diff)
1407            }
1408            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1409                cluster_replica.cluster_id,
1410                &cluster_replica.name,
1411                diff,
1412            ),
1413            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1414                // Runtime-alterable system objects have real entries in the
1415                // items collection and so get handled through the normal
1416                // `StateUpdateKind::Item`.`
1417                if !system_object_mapping.unique_identifier.runtime_alterable() {
1418                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1419                } else {
1420                    vec![]
1421                }
1422            }
1423            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1424            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1425            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1426                comment.object_id,
1427                comment.sub_component,
1428                &comment.comment,
1429                diff,
1430            )],
1431            StateUpdateKind::SourceReferences(source_references) => {
1432                self.pack_source_references_update(&source_references, diff)
1433            }
1434            StateUpdateKind::AuditLog(audit_log) => {
1435                vec![
1436                    self.pack_audit_log_update(&audit_log.event, diff)
1437                        .expect("could not pack audit log update"),
1438                ]
1439            }
1440            StateUpdateKind::Database(_)
1441            | StateUpdateKind::Schema(_)
1442            | StateUpdateKind::NetworkPolicy(_)
1443            | StateUpdateKind::StorageCollectionMetadata(_)
1444            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1445        }
1446    }
1447
1448    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1449        self.entry_by_id
1450            .get_mut(id)
1451            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1452    }
1453
1454    /// Set the optimized plan for the item identified by `id`.
1455    ///
1456    /// # Panics
1457    /// If the item is not an `Index` or `MaterializedView`.
1458    pub(super) fn set_optimized_plan(
1459        &mut self,
1460        id: GlobalId,
1461        plan: DataflowDescription<mz_expr::OptimizedMirRelationExpr>,
1462    ) {
1463        let item_id = self.entry_by_global_id[&id];
1464        let entry = self.get_entry_mut(&item_id);
1465        match entry.item_mut() {
1466            CatalogItem::Index(idx) => idx.optimized_plan = Some(Arc::new(plan)),
1467            CatalogItem::MaterializedView(mv) => mv.optimized_plan = Some(Arc::new(plan)),
1468            other => panic!("set_optimized_plan called on {} ({:?})", id, other.typ()),
1469        }
1470    }
1471
1472    /// Set the physical plan for the item identified by `id`.
1473    ///
1474    /// # Panics
1475    /// If the item is not an `Index` or `MaterializedView`.
1476    pub(super) fn set_physical_plan(
1477        &mut self,
1478        id: GlobalId,
1479        plan: DataflowDescription<mz_compute_types::plan::Plan>,
1480    ) {
1481        let item_id = self.entry_by_global_id[&id];
1482        let entry = self.get_entry_mut(&item_id);
1483        match entry.item_mut() {
1484            CatalogItem::Index(idx) => idx.physical_plan = Some(Arc::new(plan)),
1485            CatalogItem::MaterializedView(mv) => mv.physical_plan = Some(Arc::new(plan)),
1486            other => panic!("set_physical_plan called on {} ({:?})", id, other.typ()),
1487        }
1488    }
1489
1490    /// Set the `DataflowMetainfo` for the item identified by `id`.
1491    ///
1492    /// # Panics
1493    /// If the item is not an `Index` or `MaterializedView`.
1494    pub(super) fn set_dataflow_metainfo(
1495        &mut self,
1496        id: GlobalId,
1497        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
1498    ) {
1499        // Add entries to the `notices_by_dep_id` lookup map.
1500        for notice in metainfo.optimizer_notices.iter() {
1501            for dep_id in notice.dependencies.iter() {
1502                self.notices_by_dep_id
1503                    .entry(*dep_id)
1504                    .or_default()
1505                    .push(Arc::clone(notice));
1506            }
1507            if let Some(item_id) = notice.item_id {
1508                soft_assert_eq_or_log!(
1509                    item_id,
1510                    id,
1511                    "notice.item_id should match the id for whom we are saving the notice"
1512                );
1513            }
1514        }
1515        // Set the metainfo on the catalog object.
1516        let item_id = self.entry_by_global_id[&id];
1517        let entry = self.get_entry_mut(&item_id);
1518        match entry.item_mut() {
1519            CatalogItem::Index(idx) => idx.dataflow_metainfo = Some(metainfo),
1520            CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo = Some(metainfo),
1521            other => panic!("set_dataflow_metainfo called on {} ({:?})", id, other.typ()),
1522        }
1523    }
1524
1525    /// Clean up optimizer notices for the given dropped catalog entries.
1526    ///
1527    /// This extracts notices directly from the owned entries (which
1528    /// have already been removed from the catalog maps), cleans up
1529    /// the `notices_by_dep_id` reverse index, and removes notices
1530    /// from other (still-live) catalog objects that depended on the
1531    /// dropped items.
1532    ///
1533    /// Returns the set of all dropped notices for builtin table
1534    /// retraction.
1535    #[mz_ore::instrument(level = "trace")]
1536    pub(super) fn drop_optimizer_notices(
1537        &mut self,
1538        dropped_entries: Vec<CatalogEntry>,
1539    ) -> BTreeSet<Arc<OptimizerNotice>> {
1540        let mut dropped_notices = BTreeSet::new();
1541        let mut drop_ids = BTreeSet::new();
1542
1543        // Extract notices directly from the owned dropped entries.
1544        for mut entry in dropped_entries {
1545            drop_ids.extend(entry.global_ids());
1546            let metainfo = match entry.item_mut() {
1547                CatalogItem::Index(idx) => idx.dataflow_metainfo.take(),
1548                CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.take(),
1549                _ => None,
1550            };
1551            if let Some(mut metainfo) = metainfo {
1552                soft_assert_or_log!(
1553                    metainfo.optimizer_notices.iter().all_unique(),
1554                    "should have been pushed there by \
1555                     `push_optimizer_notice_dedup`"
1556                );
1557                for n in metainfo.optimizer_notices.drain(..) {
1558                    // Clean up notices_by_dep_id for this notice's
1559                    // dependencies.
1560                    for dep_id in n.dependencies.iter() {
1561                        if let Some(notices) = self.notices_by_dep_id.get_mut(dep_id) {
1562                            notices.retain(|x| &n != x);
1563                            if notices.is_empty() {
1564                                self.notices_by_dep_id.remove(dep_id);
1565                            }
1566                        }
1567                    }
1568                    dropped_notices.insert(n);
1569                }
1570            }
1571        }
1572
1573        // Remove notices_by_dep_id entries keyed by the dropped IDs.
1574        // These are notices on OTHER items that depend on a dropped
1575        // item. We need to remove them from those items' metainfo.
1576        for id in &drop_ids {
1577            if let Some(notices) = self.notices_by_dep_id.remove(id) {
1578                for n in notices.into_iter() {
1579                    // Remove the notice from the catalog object it
1580                    // lives on (if that object still exists — it
1581                    // may have been dropped too, in which case the
1582                    // notice was already collected above).
1583                    if let Some(item_id) = n.item_id.as_ref() {
1584                        if let Some(entry) = self.try_get_entry_by_global_id(item_id) {
1585                            let catalog_item_id = entry.id();
1586                            let entry = self.get_entry_mut(&catalog_item_id);
1587                            let item = entry.item_mut();
1588                            match item {
1589                                CatalogItem::Index(idx) => {
1590                                    if let Some(ref mut m) = idx.dataflow_metainfo {
1591                                        m.optimizer_notices.retain(|x| &n != x);
1592                                    }
1593                                }
1594                                CatalogItem::MaterializedView(mv) => {
1595                                    if let Some(ref mut m) = mv.dataflow_metainfo {
1596                                        m.optimizer_notices.retain(|x| &n != x);
1597                                    }
1598                                }
1599                                _ => {}
1600                            }
1601                        }
1602                    }
1603                    dropped_notices.insert(n);
1604                }
1605            }
1606        }
1607
1608        // Clean up notices_by_dep_id entries for dependency IDs
1609        // that are NOT being dropped but had dropped notices.
1610        let todo_dep_ids: BTreeSet<GlobalId> = dropped_notices
1611            .iter()
1612            .flat_map(|n| n.dependencies.iter())
1613            .filter(|dep_id| !drop_ids.contains(dep_id))
1614            .copied()
1615            .collect();
1616        for id in todo_dep_ids {
1617            if let Some(notices) = self.notices_by_dep_id.get_mut(&id) {
1618                notices.retain(|n| !dropped_notices.contains(n));
1619                if notices.is_empty() {
1620                    self.notices_by_dep_id.remove(&id);
1621                }
1622            }
1623        }
1624
1625        dropped_notices
1626    }
1627
1628    fn get_schema_mut(
1629        &mut self,
1630        database_spec: &ResolvedDatabaseSpecifier,
1631        schema_spec: &SchemaSpecifier,
1632        conn_id: &ConnectionId,
1633    ) -> &mut Schema {
1634        // Keep in sync with `get_schemas`
1635        match (database_spec, schema_spec) {
1636            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1637                .temporary_schemas
1638                .get_mut(conn_id)
1639                .expect("catalog out of sync"),
1640            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1641                .ambient_schemas_by_id
1642                .get_mut(id)
1643                .expect("catalog out of sync"),
1644            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1645                .database_by_id
1646                .get_mut(database_id)
1647                .expect("catalog out of sync")
1648                .schemas_by_id
1649                .get_mut(schema_id)
1650                .expect("catalog out of sync"),
1651            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1652                unreachable!("temporary schemas are in the ambient database")
1653            }
1654        }
1655    }
1656
1657    /// Install builtin views to the catalog. This is its own function so that views can be
1658    /// optimized in parallel.
1659    ///
1660    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1661    /// problems by sniffing out specific errors and then retrying once those dependencies are
1662    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1663    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1664    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1665    /// in awaiting with nothing retriggering the attempt.
1666    #[instrument(name = "catalog::parse_views")]
1667    async fn parse_builtin_views(
1668        state: &mut CatalogState,
1669        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1670        retractions: &mut InProgressRetractions,
1671        local_expression_cache: &mut LocalExpressionCache,
1672    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1673        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1674        let (updates, additions): (Vec<_>, Vec<_>) =
1675            builtin_views
1676                .into_iter()
1677                .partition_map(|(view, item_id, gid)| {
1678                    match retractions.system_object_mappings.remove(&item_id) {
1679                        Some(entry) => Either::Left(entry),
1680                        None => Either::Right((view, item_id, gid)),
1681                    }
1682                });
1683
1684        for entry in updates {
1685            // This implies that we updated the fingerprint for some builtin view. The retraction
1686            // was parsed, planned, and optimized using the compiled in definition, not the
1687            // definition from a previous version. So we can just stick the old entry back into the
1688            // catalog.
1689            let item_id = entry.id();
1690            state.insert_entry(entry);
1691            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1692        }
1693
1694        let mut handles = Vec::new();
1695        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1696            BTreeMap::new();
1697        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1698        // Some errors are due to the implementation of casts or SQL functions that depend on some
1699        // view. Instead of figuring out the exact view dependency, delay these until the end.
1700        let mut awaiting_all = Vec::new();
1701        // Completed views, needed to avoid race conditions.
1702        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1703        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1704
1705        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1706        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1707            .into_iter()
1708            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1709            .collect();
1710        let item_ids: Vec<_> = views.keys().copied().collect();
1711
1712        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1713        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1714            if handles.is_empty() && ready.is_empty() {
1715                // Enqueue the views that were waiting for all the others.
1716                ready.extend(awaiting_all.drain(..));
1717            }
1718
1719            // Spawn tasks for all ready views.
1720            if !ready.is_empty() {
1721                let spawn_state = Arc::new(state.clone());
1722                while let Some(id) = ready.pop_front() {
1723                    let (view, global_id) = views.get(&id).expect("must exist");
1724                    let global_id = *global_id;
1725                    let create_sql = view.create_sql();
1726                    // Views can't be versioned.
1727                    let versions = BTreeMap::new();
1728
1729                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1730                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1731                    let task_state = Arc::clone(&spawn_state);
1732                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1733                    let handle = mz_ore::task::spawn_blocking(
1734                        || "parse view",
1735                        move || {
1736                            span.in_scope(|| {
1737                                let res = task_state.parse_item_inner(
1738                                    global_id,
1739                                    &create_sql,
1740                                    &versions,
1741                                    None,
1742                                    false,
1743                                    None,
1744                                    cached_expr,
1745                                    None,
1746                                );
1747                                (id, global_id, res)
1748                            })
1749                        },
1750                    );
1751                    handles.push(handle);
1752                }
1753            }
1754
1755            // Wait for a view to be ready.
1756            let (selected, _idx, remaining) = future::select_all(handles).await;
1757            handles = remaining;
1758            let (id, global_id, res) = selected;
1759            let mut insert_cached_expr = |cached_expr| {
1760                if let Some(cached_expr) = cached_expr {
1761                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1762                }
1763            };
1764            match res {
1765                Ok((item, uncached_expr)) => {
1766                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1767                        local_expression_cache.insert_uncached_expression(
1768                            global_id,
1769                            uncached_expr,
1770                            optimizer_features,
1771                        );
1772                    }
1773                    // Add item to catalog.
1774                    let (view, _gid) = views.remove(&id).expect("must exist");
1775                    let schema_id = state
1776                        .ambient_schemas_by_name
1777                        .get(view.schema)
1778                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1779                    let qname = QualifiedItemName {
1780                        qualifiers: ItemQualifiers {
1781                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1782                            schema_spec: SchemaSpecifier::Id(*schema_id),
1783                        },
1784                        item: view.name.into(),
1785                    };
1786                    let mut acl_items = vec![rbac::owner_privilege(
1787                        mz_sql::catalog::ObjectType::View,
1788                        MZ_SYSTEM_ROLE_ID,
1789                    )];
1790                    acl_items.extend_from_slice(&view.access);
1791
1792                    state.insert_item(
1793                        id,
1794                        view.oid,
1795                        qname,
1796                        item,
1797                        MZ_SYSTEM_ROLE_ID,
1798                        PrivilegeMap::from_mz_acl_items(acl_items),
1799                    );
1800
1801                    // Enqueue any items waiting on this dependency.
1802                    let mut resolved_dependent_items = Vec::new();
1803                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1804                        resolved_dependent_items.extend(dependent_items);
1805                    }
1806                    let entry = state.get_entry(&id);
1807                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1808                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1809                        resolved_dependent_items.extend(dependent_items);
1810                    }
1811                    ready.extend(resolved_dependent_items);
1812
1813                    completed_ids.insert(id);
1814                    completed_names.insert(full_name);
1815                }
1816                // If we were missing a dependency, wait for it to be added.
1817                Err((
1818                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1819                    cached_expr,
1820                )) => {
1821                    insert_cached_expr(cached_expr);
1822                    if completed_ids.contains(&missing_dep) {
1823                        ready.push_back(id);
1824                    } else {
1825                        awaiting_id_dependencies
1826                            .entry(missing_dep)
1827                            .or_default()
1828                            .push(id);
1829                    }
1830                }
1831                // If we were missing a dependency, wait for it to be added.
1832                Err((
1833                    AdapterError::PlanError(plan::PlanError::Catalog(
1834                        SqlCatalogError::UnknownItem(missing_dep),
1835                    )),
1836                    cached_expr,
1837                )) => {
1838                    insert_cached_expr(cached_expr);
1839                    match CatalogItemId::from_str(&missing_dep) {
1840                        Ok(missing_dep) => {
1841                            if completed_ids.contains(&missing_dep) {
1842                                ready.push_back(id);
1843                            } else {
1844                                awaiting_id_dependencies
1845                                    .entry(missing_dep)
1846                                    .or_default()
1847                                    .push(id);
1848                            }
1849                        }
1850                        Err(_) => {
1851                            if completed_names.contains(&missing_dep) {
1852                                ready.push_back(id);
1853                            } else {
1854                                awaiting_name_dependencies
1855                                    .entry(missing_dep)
1856                                    .or_default()
1857                                    .push(id);
1858                            }
1859                        }
1860                    }
1861                }
1862                Err((
1863                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1864                    cached_expr,
1865                )) => {
1866                    insert_cached_expr(cached_expr);
1867                    awaiting_all.push(id);
1868                }
1869                Err((e, _)) => {
1870                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1871                    panic!(
1872                        "internal error: failed to load bootstrap view:\n\
1873                            {name}\n\
1874                            error:\n\
1875                            {e:?}\n\n\
1876                            Make sure that the schema name is specified in the builtin view's create sql statement.
1877                            ",
1878                        name = bad_view.name,
1879                    )
1880                }
1881            }
1882        }
1883
1884        assert!(awaiting_id_dependencies.is_empty());
1885        assert!(
1886            awaiting_name_dependencies.is_empty(),
1887            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1888        );
1889        assert!(awaiting_all.is_empty());
1890        assert!(views.is_empty());
1891
1892        // Generate a builtin table update for all the new views.
1893        builtin_table_updates.extend(
1894            item_ids
1895                .into_iter()
1896                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1897        );
1898
1899        builtin_table_updates
1900    }
1901
1902    /// Associates a name, `CatalogItemId`, and entry.
1903    fn insert_entry(&mut self, entry: CatalogEntry) {
1904        if !entry.id.is_system() {
1905            if let Some(cluster_id) = entry.item.cluster_id() {
1906                self.clusters_by_id
1907                    .get_mut(&cluster_id)
1908                    .expect("catalog out of sync")
1909                    .bound_objects
1910                    .insert(entry.id);
1911            };
1912        }
1913
1914        for u in entry.references().items() {
1915            match self.entry_by_id.get_mut(u) {
1916                Some(metadata) => metadata.referenced_by.push(entry.id()),
1917                None => panic!(
1918                    "Catalog: missing dependent catalog item {} while installing {}",
1919                    &u,
1920                    self.resolve_full_name(entry.name(), entry.conn_id())
1921                ),
1922            }
1923        }
1924        for u in entry.uses() {
1925            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1926            // present.
1927            if u == entry.id() {
1928                continue;
1929            }
1930            match self.entry_by_id.get_mut(&u) {
1931                Some(metadata) => metadata.used_by.push(entry.id()),
1932                None => panic!(
1933                    "Catalog: missing dependent catalog item {} while installing {}",
1934                    &u,
1935                    self.resolve_full_name(entry.name(), entry.conn_id())
1936                ),
1937            }
1938        }
1939        for gid in entry.item.global_ids() {
1940            self.entry_by_global_id.insert(gid, entry.id());
1941        }
1942        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1943        // Lazily create the temporary schema if this is a temporary item and the schema
1944        // doesn't exist yet.
1945        if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1946            && !self.temporary_schemas.contains_key(conn_id)
1947        {
1948            self.create_temporary_schema(conn_id, entry.owner_id)
1949                .expect("failed to create temporary schema");
1950        }
1951        let schema = self.get_schema_mut(
1952            &entry.name().qualifiers.database_spec,
1953            &entry.name().qualifiers.schema_spec,
1954            conn_id,
1955        );
1956
1957        let prev_id = match entry.item() {
1958            CatalogItem::Func(_) => schema
1959                .functions
1960                .insert(entry.name().item.clone(), entry.id()),
1961            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1962            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1963        };
1964
1965        assert!(
1966            prev_id.is_none(),
1967            "builtin name collision on {:?}",
1968            entry.name().item.clone()
1969        );
1970
1971        self.entry_by_id.insert(entry.id(), entry.clone());
1972    }
1973
1974    /// Associates a name, [`CatalogItemId`], and entry.
1975    fn insert_item(
1976        &mut self,
1977        id: CatalogItemId,
1978        oid: u32,
1979        name: QualifiedItemName,
1980        item: CatalogItem,
1981        owner_id: RoleId,
1982        privileges: PrivilegeMap,
1983    ) {
1984        let entry = CatalogEntry {
1985            item,
1986            name,
1987            id,
1988            oid,
1989            used_by: Vec::new(),
1990            referenced_by: Vec::new(),
1991            owner_id,
1992            privileges,
1993        };
1994
1995        self.insert_entry(entry);
1996    }
1997
1998    #[mz_ore::instrument(level = "trace")]
1999    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
2000        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
2001        for u in metadata.references().items() {
2002            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
2003                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
2004            }
2005        }
2006        for u in metadata.uses() {
2007            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
2008                dep_metadata.used_by.retain(|u| *u != metadata.id())
2009            }
2010        }
2011        for gid in metadata.global_ids() {
2012            self.entry_by_global_id.remove(&gid);
2013        }
2014
2015        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2016        let schema = self.get_schema_mut(
2017            &metadata.name().qualifiers.database_spec,
2018            &metadata.name().qualifiers.schema_spec,
2019            conn_id,
2020        );
2021        if metadata.item_type() == CatalogItemType::Type {
2022            schema
2023                .types
2024                .remove(&metadata.name().item)
2025                .expect("catalog out of sync");
2026        } else {
2027            // Functions would need special handling, but we don't yet support
2028            // dropping functions.
2029            assert_ne!(metadata.item_type(), CatalogItemType::Func);
2030
2031            schema
2032                .items
2033                .remove(&metadata.name().item)
2034                .expect("catalog out of sync");
2035        };
2036
2037        if !id.is_system() {
2038            if let Some(cluster_id) = metadata.item().cluster_id() {
2039                assert!(
2040                    self.clusters_by_id
2041                        .get_mut(&cluster_id)
2042                        .expect("catalog out of sync")
2043                        .bound_objects
2044                        .remove(&id),
2045                    "catalog out of sync"
2046                );
2047            }
2048        }
2049
2050        metadata
2051    }
2052
2053    fn insert_introspection_source_index(
2054        &mut self,
2055        cluster_id: ClusterId,
2056        log: &'static BuiltinLog,
2057        item_id: CatalogItemId,
2058        global_id: GlobalId,
2059        oid: u32,
2060    ) {
2061        let (index_name, index) =
2062            self.create_introspection_source_index(cluster_id, log, global_id);
2063        self.insert_item(
2064            item_id,
2065            oid,
2066            index_name,
2067            index,
2068            MZ_SYSTEM_ROLE_ID,
2069            PrivilegeMap::default(),
2070        );
2071    }
2072
2073    fn create_introspection_source_index(
2074        &self,
2075        cluster_id: ClusterId,
2076        log: &'static BuiltinLog,
2077        global_id: GlobalId,
2078    ) -> (QualifiedItemName, CatalogItem) {
2079        let source_name = FullItemName {
2080            database: RawDatabaseSpecifier::Ambient,
2081            schema: log.schema.into(),
2082            item: log.name.into(),
2083        };
2084        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
2085        let mut index_name = QualifiedItemName {
2086            qualifiers: ItemQualifiers {
2087                database_spec: ResolvedDatabaseSpecifier::Ambient,
2088                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
2089            },
2090            item: index_name.clone(),
2091        };
2092        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
2093        let index_item_name = index_name.item.clone();
2094        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
2095        let index = CatalogItem::Index(Index {
2096            global_id,
2097            on: log_global_id,
2098            keys: log
2099                .variant
2100                .index_by()
2101                .into_iter()
2102                .map(MirScalarExpr::column)
2103                .collect(),
2104            create_sql: index_sql(
2105                index_item_name,
2106                cluster_id,
2107                source_name,
2108                &log.variant.desc(),
2109                &log.variant.index_by(),
2110            ),
2111            conn_id: None,
2112            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
2113            cluster_id,
2114            is_retained_metrics_object: false,
2115            custom_logical_compaction_window: None,
2116            optimized_plan: None,
2117            physical_plan: None,
2118            dataflow_metainfo: None,
2119        });
2120        (index_name, index)
2121    }
2122
2123    /// Insert system configuration `name` with `value`.
2124    ///
2125    /// Return a `bool` value indicating whether the configuration was modified
2126    /// by the call.
2127    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
2128        Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
2129    }
2130
2131    /// Reset system configuration `name`.
2132    ///
2133    /// Return a `bool` value indicating whether the configuration was modified
2134    /// by the call.
2135    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
2136        Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
2137    }
2138}
2139
2140/// Sort [`StateUpdate`]s in dependency order.
2141///
2142/// # Panics
2143///
2144/// This function assumes that all provided `updates` have the same timestamp and will panic
2145/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
2146/// `StateUpdateKinds` are unique.
2147fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
2148    fn push_update<T>(
2149        update: T,
2150        diff: StateDiff,
2151        retractions: &mut Vec<T>,
2152        additions: &mut Vec<T>,
2153    ) {
2154        match diff {
2155            StateDiff::Retraction => retractions.push(update),
2156            StateDiff::Addition => additions.push(update),
2157        }
2158    }
2159
2160    soft_assert_no_log!(
2161        updates.iter().map(|update| update.ts).all_equal(),
2162        "all timestamps should be equal: {updates:?}"
2163    );
2164    soft_assert_no_log!(
2165        {
2166            let mut dedup = BTreeSet::new();
2167            updates.iter().all(|update| dedup.insert(&update.kind))
2168        },
2169        "updates should be consolidated: {updates:?}"
2170    );
2171
2172    // Partition updates by type so that we can weave different update types into the right spots.
2173    let mut pre_cluster_retractions = Vec::new();
2174    let mut pre_cluster_additions = Vec::new();
2175    let mut cluster_retractions = Vec::new();
2176    let mut cluster_additions = Vec::new();
2177    let mut builtin_item_updates = Vec::new();
2178    let mut item_retractions = Vec::new();
2179    let mut item_additions = Vec::new();
2180    let mut temp_item_retractions = Vec::new();
2181    let mut temp_item_additions = Vec::new();
2182    let mut post_item_retractions = Vec::new();
2183    let mut post_item_additions = Vec::new();
2184    for update in updates {
2185        let diff = update.diff.clone();
2186        match update.kind {
2187            StateUpdateKind::Role(_)
2188            | StateUpdateKind::RoleAuth(_)
2189            | StateUpdateKind::Database(_)
2190            | StateUpdateKind::Schema(_)
2191            | StateUpdateKind::DefaultPrivilege(_)
2192            | StateUpdateKind::SystemPrivilege(_)
2193            | StateUpdateKind::SystemConfiguration(_)
2194            | StateUpdateKind::NetworkPolicy(_) => push_update(
2195                update,
2196                diff,
2197                &mut pre_cluster_retractions,
2198                &mut pre_cluster_additions,
2199            ),
2200            StateUpdateKind::Cluster(_)
2201            | StateUpdateKind::IntrospectionSourceIndex(_)
2202            | StateUpdateKind::ClusterReplica(_) => push_update(
2203                update,
2204                diff,
2205                &mut cluster_retractions,
2206                &mut cluster_additions,
2207            ),
2208            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2209                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2210            }
2211            StateUpdateKind::TemporaryItem(item) => push_update(
2212                (item, update.ts, update.diff),
2213                diff,
2214                &mut temp_item_retractions,
2215                &mut temp_item_additions,
2216            ),
2217            StateUpdateKind::Item(item) => push_update(
2218                (item, update.ts, update.diff),
2219                diff,
2220                &mut item_retractions,
2221                &mut item_additions,
2222            ),
2223            StateUpdateKind::Comment(_)
2224            | StateUpdateKind::SourceReferences(_)
2225            | StateUpdateKind::AuditLog(_)
2226            | StateUpdateKind::StorageCollectionMetadata(_)
2227            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2228                update,
2229                diff,
2230                &mut post_item_retractions,
2231                &mut post_item_additions,
2232            ),
2233        }
2234    }
2235
2236    // Sort builtin item updates by dependency. The builtin definition order must be a dependency
2237    // order, so we can use that to avoid a more expensive topo sorting.
2238    let builtin_item_updates = builtin_item_updates
2239        .into_iter()
2240        .map(|(system_object_mapping, ts, diff)| {
2241            let idx = BUILTIN_LOOKUP
2242                .get(&system_object_mapping.description)
2243                .expect("missing builtin")
2244                .0;
2245            (idx, system_object_mapping, ts, diff)
2246        })
2247        .sorted_by_key(|(idx, _, _, _)| *idx)
2248        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2249
2250    // Partition builtins based on whether or not they should be applied before or after clusters.
2251    // Clusters depend on sources (introspection logs), so sources need to be applied first.
2252    // Everything else can be applied after clusters.
2253    let mut builtin_source_retractions = Vec::new();
2254    let mut builtin_source_additions = Vec::new();
2255    let mut other_builtin_retractions = Vec::new();
2256    let mut other_builtin_additions = Vec::new();
2257    for (builtin_item_update, ts, diff) in builtin_item_updates {
2258        let object_type = builtin_item_update.description.object_type;
2259        let update = StateUpdate {
2260            kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2261            ts,
2262            diff,
2263        };
2264        if object_type == CatalogItemType::Source {
2265            push_update(
2266                update,
2267                diff,
2268                &mut builtin_source_retractions,
2269                &mut builtin_source_additions,
2270            );
2271        } else {
2272            push_update(
2273                update,
2274                diff,
2275                &mut other_builtin_retractions,
2276                &mut other_builtin_additions,
2277            );
2278        }
2279    }
2280
2281    /// Sort items by their dependencies using topological sort.
2282    ///
2283    /// # Panics
2284    ///
2285    /// This function requires that all provided items have unique item IDs.
2286    fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2287        tracing::debug!(?items, "sorting items by dependencies");
2288
2289        let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2290        let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2291            let statement = mz_sql::parse::parse(&item.0.create_sql)
2292                .expect("valid create_sql")
2293                .into_element()
2294                .ast;
2295            mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2296        };
2297        sort_topological(items, key_fn, dependencies_fn);
2298    }
2299
2300    /// Sort item updates by dependency.
2301    ///
2302    /// First we group items into groups that are totally ordered by dependency. For example, when
2303    /// sorting all items by dependency we know that all tables can come after all sources, because
2304    /// a source can never depend on a table. Second, we sort the items in each group in
2305    /// topological order, or by ID, depending on the type.
2306    ///
2307    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2308    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2309    /// approach would be to investigate each item, discover their exact dependencies, and then
2310    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2311    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2312    /// referred to by name.
2313    ///
2314    /// The logic of this function should match [`sort_temp_item_updates`].
2315    fn sort_item_updates(
2316        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2317    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2318        // Partition items into groups s.t. each item in one group has a predefined order with all
2319        // items in other groups. For example, all sinks are ordered greater than all tables.
2320        let mut types = Vec::new();
2321        // N.B. Functions can depend on system tables, but not user tables.
2322        // TODO(udf): This will change when UDFs are supported.
2323        let mut funcs = Vec::new();
2324        let mut secrets = Vec::new();
2325        let mut connections = Vec::new();
2326        let mut sources = Vec::new();
2327        let mut tables = Vec::new();
2328        let mut derived_items = Vec::new();
2329        let mut sinks = Vec::new();
2330        for update in item_updates {
2331            match update.0.item_type() {
2332                CatalogItemType::Type => types.push(update),
2333                CatalogItemType::Func => funcs.push(update),
2334                CatalogItemType::Secret => secrets.push(update),
2335                CatalogItemType::Connection => connections.push(update),
2336                CatalogItemType::Source => sources.push(update),
2337                CatalogItemType::Table => tables.push(update),
2338                CatalogItemType::View
2339                | CatalogItemType::MaterializedView
2340                | CatalogItemType::Index => derived_items.push(update),
2341                CatalogItemType::Sink => sinks.push(update),
2342            }
2343        }
2344
2345        // For some groups, the items in them can depend on each other and can be `ALTER`ed so that
2346        // an item ends up depending on an item with a greater ID. Thus we need to perform
2347        // topological sort for these groups.
2348        sort_items_topological(&mut connections);
2349        sort_items_topological(&mut derived_items);
2350
2351        // Other groups we can simply sort by ID.
2352        for group in [
2353            &mut types,
2354            &mut funcs,
2355            &mut secrets,
2356            &mut sources,
2357            &mut tables,
2358            &mut sinks,
2359        ] {
2360            group.sort_by_key(|(item, _, _)| item.id);
2361        }
2362
2363        iter::empty()
2364            .chain(types)
2365            .chain(funcs)
2366            .chain(secrets)
2367            .chain(connections)
2368            .chain(sources)
2369            .chain(tables)
2370            .chain(derived_items)
2371            .chain(sinks)
2372            .collect()
2373    }
2374
2375    let item_retractions = sort_item_updates(item_retractions);
2376    let item_additions = sort_item_updates(item_additions);
2377
2378    /// Sort temporary item updates by dependency.
2379    ///
2380    /// The logic of this function should match [`sort_item_updates`].
2381    fn sort_temp_item_updates(
2382        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2383    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2384        // Partition items into groups s.t. each item in one group has a predefined order with all
2385        // items in other groups. For example, all sinks are ordered greater than all tables.
2386        let mut types = Vec::new();
2387        // N.B. Functions can depend on system tables, but not user tables.
2388        let mut funcs = Vec::new();
2389        let mut secrets = Vec::new();
2390        let mut connections = Vec::new();
2391        let mut sources = Vec::new();
2392        let mut tables = Vec::new();
2393        let mut derived_items = Vec::new();
2394        let mut sinks = Vec::new();
2395        for update in temp_item_updates {
2396            match update.0.item_type() {
2397                CatalogItemType::Type => types.push(update),
2398                CatalogItemType::Func => funcs.push(update),
2399                CatalogItemType::Secret => secrets.push(update),
2400                CatalogItemType::Connection => connections.push(update),
2401                CatalogItemType::Source => sources.push(update),
2402                CatalogItemType::Table => tables.push(update),
2403                CatalogItemType::View
2404                | CatalogItemType::MaterializedView
2405                | CatalogItemType::Index => derived_items.push(update),
2406                CatalogItemType::Sink => sinks.push(update),
2407            }
2408        }
2409
2410        // Within each group, sort by ID.
2411        for group in [
2412            &mut types,
2413            &mut funcs,
2414            &mut secrets,
2415            &mut connections,
2416            &mut sources,
2417            &mut tables,
2418            &mut derived_items,
2419            &mut sinks,
2420        ] {
2421            group.sort_by_key(|(item, _, _)| item.id);
2422        }
2423
2424        iter::empty()
2425            .chain(types)
2426            .chain(funcs)
2427            .chain(secrets)
2428            .chain(connections)
2429            .chain(sources)
2430            .chain(tables)
2431            .chain(derived_items)
2432            .chain(sinks)
2433            .collect()
2434    }
2435    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2436    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2437
2438    /// Merge sorted temporary and non-temp items.
2439    fn merge_item_updates(
2440        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2441        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2442    ) -> Vec<StateUpdate> {
2443        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2444
2445        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2446            (item_updates.front(), temp_item_updates.front())
2447        {
2448            if item.id < temp_item.id {
2449                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2450                state_updates.push(StateUpdate {
2451                    kind: StateUpdateKind::Item(item),
2452                    ts,
2453                    diff,
2454                });
2455            } else if item.id > temp_item.id {
2456                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2457                state_updates.push(StateUpdate {
2458                    kind: StateUpdateKind::TemporaryItem(temp_item),
2459                    ts,
2460                    diff,
2461                });
2462            } else {
2463                unreachable!(
2464                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2465                );
2466            }
2467        }
2468
2469        while let Some((item, ts, diff)) = item_updates.pop_front() {
2470            state_updates.push(StateUpdate {
2471                kind: StateUpdateKind::Item(item),
2472                ts,
2473                diff,
2474            });
2475        }
2476
2477        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2478            state_updates.push(StateUpdate {
2479                kind: StateUpdateKind::TemporaryItem(temp_item),
2480                ts,
2481                diff,
2482            });
2483        }
2484
2485        state_updates
2486    }
2487    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2488    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2489
2490    // Put everything back together.
2491    iter::empty()
2492        // All retractions must be reversed.
2493        .chain(post_item_retractions.into_iter().rev())
2494        .chain(item_retractions.into_iter().rev())
2495        .chain(other_builtin_retractions.into_iter().rev())
2496        .chain(cluster_retractions.into_iter().rev())
2497        .chain(builtin_source_retractions.into_iter().rev())
2498        .chain(pre_cluster_retractions.into_iter().rev())
2499        .chain(pre_cluster_additions)
2500        .chain(builtin_source_additions)
2501        .chain(cluster_additions)
2502        .chain(other_builtin_additions)
2503        .chain(item_additions)
2504        .chain(post_item_additions)
2505        .collect()
2506}
2507
2508/// Groups of updates of certain types are applied in batches to improve
2509/// performance. A constraint is that updates must be applied in order. This
2510/// process is modeled as a state machine that batches then applies groups of
2511/// updates.
2512enum ApplyState {
2513    /// Additions of builtin views.
2514    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2515    /// Item updates that aren't builtin view additions.
2516    ///
2517    /// This contains all updates whose application requires calling
2518    /// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2519    /// flags.
2520    Items(Vec<StateUpdate>),
2521    /// All other updates.
2522    Updates(Vec<StateUpdate>),
2523}
2524
2525impl ApplyState {
2526    fn new(update: StateUpdate) -> Self {
2527        use StateUpdateKind::*;
2528        match &update.kind {
2529            SystemObjectMapping(som)
2530                if som.description.object_type == CatalogItemType::View
2531                    && update.diff == StateDiff::Addition =>
2532            {
2533                let view_addition = lookup_builtin_view_addition(som.clone());
2534                Self::BuiltinViewAdditions(vec![view_addition])
2535            }
2536
2537            IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2538                Self::Items(vec![update])
2539            }
2540
2541            Role(_)
2542            | RoleAuth(_)
2543            | Database(_)
2544            | Schema(_)
2545            | DefaultPrivilege(_)
2546            | SystemPrivilege(_)
2547            | SystemConfiguration(_)
2548            | Cluster(_)
2549            | NetworkPolicy(_)
2550            | ClusterReplica(_)
2551            | SourceReferences(_)
2552            | Comment(_)
2553            | AuditLog(_)
2554            | StorageCollectionMetadata(_)
2555            | UnfinalizedShard(_) => Self::Updates(vec![update]),
2556        }
2557    }
2558
2559    /// Apply all updates that have been batched in `self`.
2560    ///
2561    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2562    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2563    /// details.
2564    async fn apply(
2565        self,
2566        state: &mut CatalogState,
2567        retractions: &mut InProgressRetractions,
2568        local_expression_cache: &mut LocalExpressionCache,
2569    ) -> (
2570        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2571        Vec<ParsedStateUpdate>,
2572    ) {
2573        match self {
2574            Self::BuiltinViewAdditions(builtin_view_additions) => {
2575                let restore = Arc::clone(&state.system_configuration);
2576                Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2577                let builtin_table_updates = CatalogState::parse_builtin_views(
2578                    state,
2579                    builtin_view_additions,
2580                    retractions,
2581                    local_expression_cache,
2582                )
2583                .await;
2584                state.system_configuration = restore;
2585                (builtin_table_updates, Vec::new())
2586            }
2587            Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2588                state
2589                    .apply_updates_inner(updates, retractions, local_expression_cache)
2590                    .expect("corrupt catalog")
2591            }),
2592            Self::Updates(updates) => state
2593                .apply_updates_inner(updates, retractions, local_expression_cache)
2594                .expect("corrupt catalog"),
2595        }
2596    }
2597
2598    async fn step(
2599        self,
2600        next: Self,
2601        state: &mut CatalogState,
2602        retractions: &mut InProgressRetractions,
2603        local_expression_cache: &mut LocalExpressionCache,
2604    ) -> (
2605        Self,
2606        (
2607            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2608            Vec<ParsedStateUpdate>,
2609        ),
2610    ) {
2611        match (self, next) {
2612            (
2613                Self::BuiltinViewAdditions(mut builtin_view_additions),
2614                Self::BuiltinViewAdditions(next_builtin_view_additions),
2615            ) => {
2616                // Continue batching builtin view additions.
2617                builtin_view_additions.extend(next_builtin_view_additions);
2618                (
2619                    Self::BuiltinViewAdditions(builtin_view_additions),
2620                    (Vec::new(), Vec::new()),
2621                )
2622            }
2623            (Self::Items(mut updates), Self::Items(next_updates)) => {
2624                // Continue batching item updates.
2625                updates.extend(next_updates);
2626                (Self::Items(updates), (Vec::new(), Vec::new()))
2627            }
2628            (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2629                // Continue batching updates.
2630                updates.extend(next_updates);
2631                (Self::Updates(updates), (Vec::new(), Vec::new()))
2632            }
2633            (apply_state, next_apply_state) => {
2634                // Apply the current batch and start batching new apply state.
2635                let updates = apply_state
2636                    .apply(state, retractions, local_expression_cache)
2637                    .await;
2638                (next_apply_state, updates)
2639            }
2640        }
2641    }
2642}
2643
2644/// Trait abstracting over map operations needed by [`apply_inverted_lookup`] and
2645/// [`apply_with_update`]. Both [`BTreeMap`] and [`imbl::OrdMap`] implement this.
2646trait MutableMap<K, V> {
2647    fn insert(&mut self, key: K, value: V) -> Option<V>;
2648    fn remove(&mut self, key: &K) -> Option<V>;
2649}
2650
2651impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2652    fn insert(&mut self, key: K, value: V) -> Option<V> {
2653        BTreeMap::insert(self, key, value)
2654    }
2655    fn remove(&mut self, key: &K) -> Option<V> {
2656        BTreeMap::remove(self, key)
2657    }
2658}
2659
2660impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2661    fn insert(&mut self, key: K, value: V) -> Option<V> {
2662        imbl::OrdMap::insert(self, key, value)
2663    }
2664    fn remove(&mut self, key: &K) -> Option<V> {
2665        imbl::OrdMap::remove(self, key)
2666    }
2667}
2668
2669/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2670/// generally IDs.
2671///
2672/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2673fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2674where
2675    K: Ord + Clone + Debug,
2676    V: PartialEq + Debug,
2677{
2678    match diff {
2679        StateDiff::Retraction => {
2680            let prev = map.remove(key);
2681            assert_eq!(
2682                prev,
2683                Some(value),
2684                "retraction does not match existing value: {key:?}"
2685            );
2686        }
2687        StateDiff::Addition => {
2688            let prev = map.insert(key.clone(), value);
2689            assert_eq!(
2690                prev, None,
2691                "values must be explicitly retracted before inserting a new value: {key:?}"
2692            );
2693        }
2694    }
2695}
2696
2697/// Helper method to update catalog state, that may need to be updated from a previously retracted
2698/// object.
2699fn apply_with_update<K, V, D>(
2700    map: &mut impl MutableMap<K, V>,
2701    durable: D,
2702    key_fn: impl FnOnce(&D) -> K,
2703    diff: StateDiff,
2704    retractions: &mut BTreeMap<D::Key, V>,
2705) where
2706    K: Ord,
2707    V: UpdateFrom<D> + PartialEq + Debug,
2708    D: DurableType,
2709    D::Key: Ord,
2710{
2711    match diff {
2712        StateDiff::Retraction => {
2713            let mem_key = key_fn(&durable);
2714            let value = map
2715                .remove(&mem_key)
2716                .expect("retraction does not match existing value: {key:?}");
2717            let durable_key = durable.into_key_value().0;
2718            retractions.insert(durable_key, value);
2719        }
2720        StateDiff::Addition => {
2721            let mem_key = key_fn(&durable);
2722            let durable_key = durable.key();
2723            let value = match retractions.remove(&durable_key) {
2724                Some(mut retraction) => {
2725                    retraction.update_from(durable);
2726                    retraction
2727                }
2728                None => durable.into(),
2729            };
2730            let prev = map.insert(mem_key, value);
2731            assert_eq!(
2732                prev, None,
2733                "values must be explicitly retracted before inserting a new value"
2734            );
2735        }
2736    }
2737}
2738
2739/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2740fn lookup_builtin_view_addition(
2741    mapping: SystemObjectMapping,
2742) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2743    let (_, builtin) = BUILTIN_LOOKUP
2744        .get(&mapping.description)
2745        .expect("missing builtin view");
2746    let Builtin::View(view) = builtin else {
2747        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2748    };
2749
2750    (
2751        view,
2752        mapping.unique_identifier.catalog_id,
2753        mapping.unique_identifier.global_id,
2754    )
2755}