Skip to main content

mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35    Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_compute_types::dataflows::DataflowDescription;
40use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_ore::collections::CollectionExt;
44use mz_ore::tracing::OpenTelemetryContext;
45use mz_ore::{instrument, soft_assert_eq_or_log, soft_assert_no_log, soft_assert_or_log};
46use mz_pgrepr::oid::INVALID_OID;
47use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
48use mz_repr::role_id::RoleId;
49use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
50use mz_sql::catalog::CatalogError as SqlCatalogError;
51use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
52use mz_sql::names::{
53    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
54    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
55};
56use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
57use mz_sql::session::vars::{VarError, VarInput};
58use mz_sql::{plan, rbac};
59use mz_sql_parser::ast::Expr;
60use mz_storage_types::sources::Timeline;
61use mz_transform::dataflow::DataflowMetainfo;
62use mz_transform::notice::OptimizerNotice;
63use tracing::{info_span, warn};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{BuiltinTableUpdate, CatalogState};
68use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
69use crate::util::{index_sql, sort_topological};
70
71/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
72/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
73/// results in applying a retraction for that object followed by applying an addition for that
74/// object. When applying those additions it can be extremely expensive to re-build that
75/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
76/// retractions, so it can be used during additions.
77///
78/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
79/// objects that maintain denormalized state.
80// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
81// the update step is a no-op for certain types.
82#[derive(Debug, Clone, Default)]
83struct InProgressRetractions {
84    roles: BTreeMap<RoleKey, Role>,
85    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
86    databases: BTreeMap<DatabaseKey, Database>,
87    schemas: BTreeMap<SchemaKey, Schema>,
88    clusters: BTreeMap<ClusterKey, Cluster>,
89    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
90    items: BTreeMap<ItemKey, CatalogEntry>,
91    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
92    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
93    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
94}
95
96impl CatalogState {
97    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
98    ///
99    /// Returns builtin table updates corresponding to the changes to catalog state.
100    #[must_use]
101    #[instrument]
102    pub(crate) async fn apply_updates(
103        &mut self,
104        updates: Vec<StateUpdate>,
105        local_expression_cache: &mut LocalExpressionCache,
106    ) -> (
107        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
108        Vec<ParsedStateUpdate>,
109    ) {
110        let mut builtin_table_updates = Vec::with_capacity(updates.len());
111        let mut catalog_updates = Vec::with_capacity(updates.len());
112
113        // First, consolidate updates. The code that applies parsed state
114        // updates _requires_ that the given updates are consolidated. There
115        // must be at most one addition and/or one retraction for a given item,
116        // as identified by that items ID type.
117        let updates = Self::consolidate_updates(updates);
118
119        // Apply updates in groups, according to their timestamps.
120        let mut groups: Vec<Vec<_>> = Vec::new();
121        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
122            // Bring the updates into the pseudo-topological order that we need
123            // for updating our in-memory state and generating builtin table
124            // updates.
125            let updates = sort_updates(updates.collect());
126            groups.push(updates);
127        }
128
129        for updates in groups {
130            let mut apply_state = ApplyState::Updates(Vec::new());
131            let mut retractions = InProgressRetractions::default();
132
133            for update in updates {
134                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
135                    .step(
136                        ApplyState::new(update),
137                        self,
138                        &mut retractions,
139                        local_expression_cache,
140                    )
141                    .await;
142                apply_state = next_apply_state;
143                builtin_table_updates.extend(builtin_table_update);
144                catalog_updates.extend(catalog_update);
145            }
146
147            // Apply remaining state.
148            let (builtin_table_update, catalog_update) = apply_state
149                .apply(self, &mut retractions, local_expression_cache)
150                .await;
151            builtin_table_updates.extend(builtin_table_update);
152            catalog_updates.extend(catalog_update);
153
154            // Clean up plans and optimizer notices for items that
155            // were retracted but not replaced (i.e., truly dropped).
156            let dropped_entries: Vec<CatalogEntry> = retractions
157                .items
158                .into_values()
159                .chain(retractions.temp_items.into_values())
160                .collect();
161            if !dropped_entries.is_empty() {
162                let dropped_notices = self.drop_optimizer_notices(dropped_entries);
163                if self.system_config().enable_mz_notices() {
164                    self.pack_optimizer_notice_updates(
165                        &mut builtin_table_updates,
166                        dropped_notices.iter(),
167                        Diff::MINUS_ONE,
168                    );
169                }
170            }
171        }
172
173        (builtin_table_updates, catalog_updates)
174    }
175
176    /// It can happen that the sequencing logic creates "fluctuating" updates
177    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
178    /// for a table, there will be a retraction of the original table state,
179    /// then an addition for the same table but stripped of some of the roles
180    /// and access things, and then a retraction for that intermediate table
181    /// state. By consolidating, the intermediate state addition/retraction will
182    /// cancel out and we'll only see the retraction for the original state.
183    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
184        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
185            .into_iter()
186            .map(|update| (update.kind, update.ts, update.diff.into()))
187            .collect_vec();
188
189        consolidate_updates(&mut updates);
190
191        updates
192            .into_iter()
193            .map(|(kind, ts, diff)| StateUpdate {
194                kind,
195                ts,
196                diff: diff
197                    .try_into()
198                    .expect("catalog state cannot have diff other than -1 or 1"),
199            })
200            .collect_vec()
201    }
202
203    #[instrument(level = "debug")]
204    fn apply_updates_inner(
205        &mut self,
206        updates: Vec<StateUpdate>,
207        retractions: &mut InProgressRetractions,
208        local_expression_cache: &mut LocalExpressionCache,
209    ) -> Result<
210        (
211            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
212            Vec<ParsedStateUpdate>,
213        ),
214        CatalogError,
215    > {
216        soft_assert_no_log!(
217            updates.iter().map(|update| update.ts).all_equal(),
218            "all timestamps should be equal: {updates:?}"
219        );
220
221        let mut update_system_config = false;
222
223        let mut builtin_table_updates = Vec::with_capacity(updates.len());
224        let mut catalog_updates = Vec::new();
225
226        for state_update in updates {
227            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
228                update_system_config = true;
229            }
230
231            match state_update.diff {
232                StateDiff::Retraction => {
233                    // We want the parsed catalog updates to match the state of
234                    // the catalog _before_ applying a retraction. So that we can
235                    // still have useful in-memory state to work with.
236                    if let Some(update) =
237                        parsed_state_updates::parse_state_update(self, state_update.clone())
238                    {
239                        catalog_updates.push(update);
240                    }
241
242                    // We want the builtin table retraction to match the state of the catalog
243                    // before applying the update.
244                    builtin_table_updates.extend(self.generate_builtin_table_update(
245                        state_update.kind.clone(),
246                        state_update.diff,
247                    ));
248                    self.apply_update(
249                        state_update.kind,
250                        state_update.diff,
251                        retractions,
252                        local_expression_cache,
253                    )?;
254                }
255                StateDiff::Addition => {
256                    self.apply_update(
257                        state_update.kind.clone(),
258                        state_update.diff,
259                        retractions,
260                        local_expression_cache,
261                    )?;
262                    // We want the builtin table addition to match the state of
263                    // the catalog after applying the update. So that we already
264                    // have useful in-memory state to work with.
265                    builtin_table_updates.extend(self.generate_builtin_table_update(
266                        state_update.kind.clone(),
267                        state_update.diff,
268                    ));
269
270                    // We want the parsed catalog updates to match the state of
271                    // the catalog _after_ applying an addition.
272                    if let Some(update) =
273                        parsed_state_updates::parse_state_update(self, state_update.clone())
274                    {
275                        catalog_updates.push(update);
276                    }
277                }
278            }
279        }
280
281        if update_system_config {
282            self.system_configuration.dyncfg_updates();
283        }
284
285        Ok((builtin_table_updates, catalog_updates))
286    }
287
288    #[instrument(level = "debug")]
289    fn apply_update(
290        &mut self,
291        kind: StateUpdateKind,
292        diff: StateDiff,
293        retractions: &mut InProgressRetractions,
294        local_expression_cache: &mut LocalExpressionCache,
295    ) -> Result<(), CatalogError> {
296        match kind {
297            StateUpdateKind::Role(role) => {
298                self.apply_role_update(role, diff, retractions);
299            }
300            StateUpdateKind::RoleAuth(role_auth) => {
301                self.apply_role_auth_update(role_auth, diff, retractions);
302            }
303            StateUpdateKind::Database(database) => {
304                self.apply_database_update(database, diff, retractions);
305            }
306            StateUpdateKind::Schema(schema) => {
307                self.apply_schema_update(schema, diff, retractions);
308            }
309            StateUpdateKind::DefaultPrivilege(default_privilege) => {
310                self.apply_default_privilege_update(default_privilege, diff, retractions);
311            }
312            StateUpdateKind::SystemPrivilege(system_privilege) => {
313                self.apply_system_privilege_update(system_privilege, diff, retractions);
314            }
315            StateUpdateKind::SystemConfiguration(system_configuration) => {
316                self.apply_system_configuration_update(system_configuration, diff, retractions);
317            }
318            StateUpdateKind::Cluster(cluster) => {
319                self.apply_cluster_update(cluster, diff, retractions);
320            }
321            StateUpdateKind::NetworkPolicy(network_policy) => {
322                self.apply_network_policy_update(network_policy, diff, retractions);
323            }
324            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
325                self.apply_introspection_source_index_update(
326                    introspection_source_index,
327                    diff,
328                    retractions,
329                );
330            }
331            StateUpdateKind::ClusterReplica(cluster_replica) => {
332                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
333            }
334            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
335                self.apply_system_object_mapping_update(
336                    system_object_mapping,
337                    diff,
338                    retractions,
339                    local_expression_cache,
340                );
341            }
342            StateUpdateKind::TemporaryItem(item) => {
343                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
344            }
345            StateUpdateKind::Item(item) => {
346                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
347            }
348            StateUpdateKind::Comment(comment) => {
349                self.apply_comment_update(comment, diff, retractions);
350            }
351            StateUpdateKind::SourceReferences(source_reference) => {
352                self.apply_source_references_update(source_reference, diff, retractions);
353            }
354            StateUpdateKind::AuditLog(_audit_log) => {
355                // Audit logs are not stored in-memory.
356            }
357            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
358                self.apply_storage_collection_metadata_update(
359                    storage_collection_metadata,
360                    diff,
361                    retractions,
362                );
363            }
364            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
365                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
366            }
367        }
368
369        Ok(())
370    }
371
372    #[instrument(level = "debug")]
373    fn apply_role_auth_update(
374        &mut self,
375        role_auth: mz_catalog::durable::RoleAuth,
376        diff: StateDiff,
377        retractions: &mut InProgressRetractions,
378    ) {
379        apply_with_update(
380            &mut self.role_auth_by_id,
381            role_auth,
382            |role_auth| role_auth.role_id,
383            diff,
384            &mut retractions.role_auths,
385        );
386    }
387
388    #[instrument(level = "debug")]
389    fn apply_role_update(
390        &mut self,
391        role: mz_catalog::durable::Role,
392        diff: StateDiff,
393        retractions: &mut InProgressRetractions,
394    ) {
395        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
396        apply_with_update(
397            &mut self.roles_by_id,
398            role,
399            |role| role.id,
400            diff,
401            &mut retractions.roles,
402        );
403    }
404
405    #[instrument(level = "debug")]
406    fn apply_database_update(
407        &mut self,
408        database: mz_catalog::durable::Database,
409        diff: StateDiff,
410        retractions: &mut InProgressRetractions,
411    ) {
412        apply_inverted_lookup(
413            &mut self.database_by_name,
414            &database.name,
415            database.id,
416            diff,
417        );
418        apply_with_update(
419            &mut self.database_by_id,
420            database,
421            |database| database.id,
422            diff,
423            &mut retractions.databases,
424        );
425    }
426
427    #[instrument(level = "debug")]
428    fn apply_schema_update(
429        &mut self,
430        schema: mz_catalog::durable::Schema,
431        diff: StateDiff,
432        retractions: &mut InProgressRetractions,
433    ) {
434        match &schema.database_id {
435            Some(database_id) => {
436                let db = self
437                    .database_by_id
438                    .get_mut(database_id)
439                    .expect("catalog out of sync");
440                apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
441                apply_with_update(
442                    &mut db.schemas_by_id,
443                    schema,
444                    |schema| schema.id,
445                    diff,
446                    &mut retractions.schemas,
447                );
448            }
449            None => {
450                apply_inverted_lookup(
451                    &mut self.ambient_schemas_by_name,
452                    &schema.name,
453                    schema.id,
454                    diff,
455                );
456                apply_with_update(
457                    &mut self.ambient_schemas_by_id,
458                    schema,
459                    |schema| schema.id,
460                    diff,
461                    &mut retractions.schemas,
462                );
463            }
464        }
465    }
466
467    #[instrument(level = "debug")]
468    fn apply_default_privilege_update(
469        &mut self,
470        default_privilege: mz_catalog::durable::DefaultPrivilege,
471        diff: StateDiff,
472        _retractions: &mut InProgressRetractions,
473    ) {
474        match diff {
475            StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
476                .grant(default_privilege.object, default_privilege.acl_item),
477            StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
478                .revoke(&default_privilege.object, &default_privilege.acl_item),
479        }
480    }
481
482    #[instrument(level = "debug")]
483    fn apply_system_privilege_update(
484        &mut self,
485        system_privilege: MzAclItem,
486        diff: StateDiff,
487        _retractions: &mut InProgressRetractions,
488    ) {
489        match diff {
490            StateDiff::Addition => {
491                Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
492            }
493            StateDiff::Retraction => {
494                Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
495            }
496        }
497    }
498
499    #[instrument(level = "debug")]
500    fn apply_system_configuration_update(
501        &mut self,
502        system_configuration: mz_catalog::durable::SystemConfiguration,
503        diff: StateDiff,
504        _retractions: &mut InProgressRetractions,
505    ) {
506        let res = match diff {
507            StateDiff::Addition => self.insert_system_configuration(
508                &system_configuration.name,
509                VarInput::Flat(&system_configuration.value),
510            ),
511            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
512        };
513        match res {
514            Ok(_) => (),
515            // When system variables are deleted, nothing deletes them from the underlying
516            // durable catalog, which isn't great. Still, we need to be able to ignore
517            // unknown variables.
518            Err(Error {
519                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
520            }) => {
521                warn!(%name, "unknown system parameter from catalog storage");
522            }
523            Err(e) => panic!("unable to update system variable: {e:?}"),
524        }
525    }
526
527    #[instrument(level = "debug")]
528    fn apply_cluster_update(
529        &mut self,
530        cluster: mz_catalog::durable::Cluster,
531        diff: StateDiff,
532        retractions: &mut InProgressRetractions,
533    ) {
534        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
535        apply_with_update(
536            &mut self.clusters_by_id,
537            cluster,
538            |cluster| cluster.id,
539            diff,
540            &mut retractions.clusters,
541        );
542    }
543
544    #[instrument(level = "debug")]
545    fn apply_network_policy_update(
546        &mut self,
547        policy: mz_catalog::durable::NetworkPolicy,
548        diff: StateDiff,
549        retractions: &mut InProgressRetractions,
550    ) {
551        apply_inverted_lookup(
552            &mut self.network_policies_by_name,
553            &policy.name,
554            policy.id,
555            diff,
556        );
557        apply_with_update(
558            &mut self.network_policies_by_id,
559            policy,
560            |policy| policy.id,
561            diff,
562            &mut retractions.network_policies,
563        );
564    }
565
566    #[instrument(level = "debug")]
567    fn apply_introspection_source_index_update(
568        &mut self,
569        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
570        diff: StateDiff,
571        retractions: &mut InProgressRetractions,
572    ) {
573        let cluster = self
574            .clusters_by_id
575            .get_mut(&introspection_source_index.cluster_id)
576            .expect("catalog out of sync");
577        let log = BUILTIN_LOG_LOOKUP
578            .get(introspection_source_index.name.as_str())
579            .expect("missing log");
580        apply_inverted_lookup(
581            &mut cluster.log_indexes,
582            &log.variant,
583            introspection_source_index.index_id,
584            diff,
585        );
586
587        match diff {
588            StateDiff::Addition => {
589                if let Some(mut entry) = retractions
590                    .introspection_source_indexes
591                    .remove(&introspection_source_index.item_id)
592                {
593                    // This should only happen during startup as a result of builtin migrations. We
594                    // create a new index item and replace the old one with it.
595                    let (index_name, index) = self.create_introspection_source_index(
596                        introspection_source_index.cluster_id,
597                        log,
598                        introspection_source_index.index_id,
599                    );
600                    assert_eq!(entry.id, introspection_source_index.item_id);
601                    assert_eq!(entry.oid, introspection_source_index.oid);
602                    assert_eq!(entry.name, index_name);
603                    entry.item = index;
604                    self.insert_entry(entry);
605                } else {
606                    self.insert_introspection_source_index(
607                        introspection_source_index.cluster_id,
608                        log,
609                        introspection_source_index.item_id,
610                        introspection_source_index.index_id,
611                        introspection_source_index.oid,
612                    );
613                }
614            }
615            StateDiff::Retraction => {
616                let entry = self.drop_item(introspection_source_index.item_id);
617                retractions
618                    .introspection_source_indexes
619                    .insert(entry.id, entry);
620            }
621        }
622    }
623
624    #[instrument(level = "debug")]
625    fn apply_cluster_replica_update(
626        &mut self,
627        cluster_replica: mz_catalog::durable::ClusterReplica,
628        diff: StateDiff,
629        _retractions: &mut InProgressRetractions,
630    ) {
631        let cluster = self
632            .clusters_by_id
633            .get(&cluster_replica.cluster_id)
634            .expect("catalog out of sync");
635        let azs = cluster.availability_zones();
636        let location = self
637            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
638            .expect("catalog in unexpected state");
639        let cluster = self
640            .clusters_by_id
641            .get_mut(&cluster_replica.cluster_id)
642            .expect("catalog out of sync");
643        apply_inverted_lookup(
644            &mut cluster.replica_id_by_name_,
645            &cluster_replica.name,
646            cluster_replica.replica_id,
647            diff,
648        );
649        match diff {
650            StateDiff::Retraction => {
651                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
652                assert!(
653                    prev.is_some(),
654                    "retraction does not match existing value: {:?}",
655                    cluster_replica.replica_id
656                );
657            }
658            StateDiff::Addition => {
659                let logging = ReplicaLogging {
660                    log_logging: cluster_replica.config.logging.log_logging,
661                    interval: cluster_replica.config.logging.interval,
662                };
663                let config = ReplicaConfig {
664                    location,
665                    compute: ComputeReplicaConfig { logging },
666                };
667                let mem_cluster_replica = ClusterReplica {
668                    name: cluster_replica.name.clone(),
669                    cluster_id: cluster_replica.cluster_id,
670                    replica_id: cluster_replica.replica_id,
671                    config,
672                    owner_id: cluster_replica.owner_id,
673                };
674                let prev = cluster
675                    .replicas_by_id_
676                    .insert(cluster_replica.replica_id, mem_cluster_replica);
677                assert_eq!(
678                    prev, None,
679                    "values must be explicitly retracted before inserting a new value: {:?}",
680                    cluster_replica.replica_id
681                );
682            }
683        }
684    }
685
686    #[instrument(level = "debug")]
687    fn apply_system_object_mapping_update(
688        &mut self,
689        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
690        diff: StateDiff,
691        retractions: &mut InProgressRetractions,
692        local_expression_cache: &mut LocalExpressionCache,
693    ) {
694        let item_id = system_object_mapping.unique_identifier.catalog_id;
695        let global_id = system_object_mapping.unique_identifier.global_id;
696
697        if system_object_mapping.unique_identifier.runtime_alterable() {
698            // Runtime-alterable system objects have real entries in the items
699            // collection and so get handled through the normal `insert_item`
700            // and `drop_item` code paths.
701            return;
702        }
703
704        if let StateDiff::Retraction = diff {
705            let entry = self.drop_item(item_id);
706            retractions.system_object_mappings.insert(item_id, entry);
707            return;
708        }
709
710        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
711            // This implies that we updated the fingerprint for some builtin item. The retraction
712            // was parsed, planned, and optimized using the compiled in definition, not the
713            // definition from a previous version. So we can just stick the old entry back into the
714            // catalog.
715            self.insert_entry(entry);
716            return;
717        }
718
719        let builtin = BUILTIN_LOOKUP
720            .get(&system_object_mapping.description)
721            .expect("missing builtin")
722            .1;
723        let schema_name = builtin.schema();
724        let schema_id = self
725            .ambient_schemas_by_name
726            .get(schema_name)
727            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
728        let name = QualifiedItemName {
729            qualifiers: ItemQualifiers {
730                database_spec: ResolvedDatabaseSpecifier::Ambient,
731                schema_spec: SchemaSpecifier::Id(*schema_id),
732            },
733            item: builtin.name().into(),
734        };
735        match builtin {
736            Builtin::Log(log) => {
737                let mut acl_items = vec![rbac::owner_privilege(
738                    mz_sql::catalog::ObjectType::Source,
739                    MZ_SYSTEM_ROLE_ID,
740                )];
741                acl_items.extend_from_slice(&log.access);
742                self.insert_item(
743                    item_id,
744                    log.oid,
745                    name.clone(),
746                    CatalogItem::Log(Log {
747                        variant: log.variant,
748                        global_id,
749                    }),
750                    MZ_SYSTEM_ROLE_ID,
751                    PrivilegeMap::from_mz_acl_items(acl_items),
752                );
753            }
754
755            Builtin::Table(table) => {
756                let mut acl_items = vec![rbac::owner_privilege(
757                    mz_sql::catalog::ObjectType::Table,
758                    MZ_SYSTEM_ROLE_ID,
759                )];
760                acl_items.extend_from_slice(&table.access);
761
762                self.insert_item(
763                    item_id,
764                    table.oid,
765                    name.clone(),
766                    CatalogItem::Table(Table {
767                        create_sql: None,
768                        desc: VersionedRelationDesc::new(table.desc.clone()),
769                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
770                        conn_id: None,
771                        resolved_ids: ResolvedIds::empty(),
772                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
773                            || {
774                                self.system_config()
775                                    .metrics_retention()
776                                    .try_into()
777                                    .expect("invalid metrics retention")
778                            },
779                        ),
780                        is_retained_metrics_object: table.is_retained_metrics_object,
781                        data_source: TableDataSource::TableWrites {
782                            defaults: vec![Expr::null(); table.desc.arity()],
783                        },
784                    }),
785                    MZ_SYSTEM_ROLE_ID,
786                    PrivilegeMap::from_mz_acl_items(acl_items),
787                );
788            }
789            Builtin::Index(index) => {
790                let custom_logical_compaction_window =
791                    index.is_retained_metrics_object.then(|| {
792                        self.system_config()
793                            .metrics_retention()
794                            .try_into()
795                            .expect("invalid metrics retention")
796                    });
797                // Indexes can't be versioned.
798                let versions = BTreeMap::new();
799
800                let item = self
801                    .parse_item(
802                        global_id,
803                        &index.create_sql(),
804                        &versions,
805                        None,
806                        index.is_retained_metrics_object,
807                        custom_logical_compaction_window,
808                        local_expression_cache,
809                        None,
810                    )
811                    .unwrap_or_else(|e| {
812                        panic!(
813                            "internal error: failed to load bootstrap index:\n\
814                                    {}\n\
815                                    error:\n\
816                                    {:?}\n\n\
817                                    make sure that the schema name is specified in the builtin index's create sql statement.",
818                            index.name, e
819                        )
820                    });
821                let CatalogItem::Index(_) = item else {
822                    panic!(
823                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
824                        index.name
825                    );
826                };
827
828                self.insert_item(
829                    item_id,
830                    index.oid,
831                    name,
832                    item,
833                    MZ_SYSTEM_ROLE_ID,
834                    PrivilegeMap::default(),
835                );
836            }
837            Builtin::View(_) => {
838                // parse_views is responsible for inserting all builtin views.
839                unreachable!("views added elsewhere");
840            }
841
842            // Note: Element types must be loaded before array types.
843            Builtin::Type(typ) => {
844                let typ = self.resolve_builtin_type_references(typ);
845                if let CatalogType::Array { element_reference } = typ.details.typ {
846                    let entry = self.get_entry_mut(&element_reference);
847                    let item_type = match &mut entry.item {
848                        CatalogItem::Type(item_type) => item_type,
849                        _ => unreachable!("types can only reference other types"),
850                    };
851                    item_type.details.array_id = Some(item_id);
852                }
853
854                let schema_id = self.resolve_system_schema(typ.schema);
855
856                self.insert_item(
857                    item_id,
858                    typ.oid,
859                    QualifiedItemName {
860                        qualifiers: ItemQualifiers {
861                            database_spec: ResolvedDatabaseSpecifier::Ambient,
862                            schema_spec: SchemaSpecifier::Id(schema_id),
863                        },
864                        item: typ.name.to_owned(),
865                    },
866                    CatalogItem::Type(Type {
867                        create_sql: None,
868                        global_id,
869                        details: typ.details.clone(),
870                        resolved_ids: ResolvedIds::empty(),
871                    }),
872                    MZ_SYSTEM_ROLE_ID,
873                    PrivilegeMap::from_mz_acl_items(vec![
874                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
875                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
876                    ]),
877                );
878            }
879
880            Builtin::Func(func) => {
881                // This OID is never used. `func` has a `Vec` of implementations and
882                // each implementation has its own OID. Those are the OIDs that are
883                // actually used by the system.
884                let oid = INVALID_OID;
885                self.insert_item(
886                    item_id,
887                    oid,
888                    name.clone(),
889                    CatalogItem::Func(Func {
890                        inner: func.inner,
891                        global_id,
892                    }),
893                    MZ_SYSTEM_ROLE_ID,
894                    PrivilegeMap::default(),
895                );
896            }
897
898            Builtin::Source(coll) => {
899                let mut acl_items = vec![rbac::owner_privilege(
900                    mz_sql::catalog::ObjectType::Source,
901                    MZ_SYSTEM_ROLE_ID,
902                )];
903                acl_items.extend_from_slice(&coll.access);
904
905                self.insert_item(
906                    item_id,
907                    coll.oid,
908                    name.clone(),
909                    CatalogItem::Source(Source {
910                        create_sql: None,
911                        data_source: coll.data_source.clone(),
912                        desc: coll.desc.clone(),
913                        global_id,
914                        timeline: Timeline::EpochMilliseconds,
915                        resolved_ids: ResolvedIds::empty(),
916                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
917                            || {
918                                self.system_config()
919                                    .metrics_retention()
920                                    .try_into()
921                                    .expect("invalid metrics retention")
922                            },
923                        ),
924                        is_retained_metrics_object: coll.is_retained_metrics_object,
925                    }),
926                    MZ_SYSTEM_ROLE_ID,
927                    PrivilegeMap::from_mz_acl_items(acl_items),
928                );
929            }
930            Builtin::MaterializedView(mv) => {
931                let mut acl_items = vec![rbac::owner_privilege(
932                    mz_sql::catalog::ObjectType::MaterializedView,
933                    MZ_SYSTEM_ROLE_ID,
934                )];
935                acl_items.extend_from_slice(&mv.access);
936
937                let custom_logical_compaction_window = mv.is_retained_metrics_object.then(|| {
938                    self.system_config()
939                        .metrics_retention()
940                        .try_into()
941                        .expect("invalid metrics retention")
942                });
943
944                // Builtin materialized views can't be versioned.
945                let versions = BTreeMap::new();
946
947                let mut item = self
948                    .parse_item(
949                        global_id,
950                        &mv.create_sql(),
951                        &versions,
952                        None,
953                        mv.is_retained_metrics_object,
954                        custom_logical_compaction_window,
955                        local_expression_cache,
956                        None,
957                    )
958                    .unwrap_or_else(|e| {
959                        panic!(
960                            "internal error: failed to load bootstrap materialized view:\n\
961                             {}\n\
962                             error:\n\
963                             {e:?}\n\n\
964                             make sure that the schema name is specified in the builtin \
965                             materialized view's create sql statement.",
966                            mv.name,
967                        )
968                    });
969                let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
970                    panic!(
971                        "internal error: builtin materialized view {}'s SQL does not begin \
972                         with \"CREATE MATERIALIZED VIEW\".",
973                        mv.name,
974                    );
975                };
976
977                // The optimizer can only infer keys from MV definitions, but cannot infer
978                // uniqueness present in the input data. Extend with the keys declared in the
979                // builtin definition, to allow supplying additional key knowledge.
980                let mut desc = catalog_mv.desc.latest();
981                for key in &mv.desc.typ().keys {
982                    desc = desc.with_key(key.clone());
983                }
984                catalog_mv.desc = VersionedRelationDesc::new(desc);
985
986                self.insert_item(
987                    item_id,
988                    mv.oid,
989                    name,
990                    item,
991                    MZ_SYSTEM_ROLE_ID,
992                    PrivilegeMap::from_mz_acl_items(acl_items),
993                );
994            }
995            Builtin::ContinualTask(ct) => {
996                let mut acl_items = vec![rbac::owner_privilege(
997                    mz_sql::catalog::ObjectType::Source,
998                    MZ_SYSTEM_ROLE_ID,
999                )];
1000                acl_items.extend_from_slice(&ct.access);
1001                // Continual Tasks can't be versioned.
1002                let versions = BTreeMap::new();
1003
1004                let item = self
1005                    .parse_item(
1006                        global_id,
1007                        &ct.create_sql(),
1008                        &versions,
1009                        None,
1010                        false,
1011                        None,
1012                        local_expression_cache,
1013                        None,
1014                    )
1015                    .unwrap_or_else(|e| {
1016                        panic!(
1017                            "internal error: failed to load bootstrap continual task:\n\
1018                                    {}\n\
1019                                    error:\n\
1020                                    {:?}\n\n\
1021                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
1022                            ct.name, e
1023                        )
1024                    });
1025                let CatalogItem::ContinualTask(_) = &item else {
1026                    panic!(
1027                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
1028                        ct.name
1029                    );
1030                };
1031
1032                self.insert_item(
1033                    item_id,
1034                    ct.oid,
1035                    name,
1036                    item,
1037                    MZ_SYSTEM_ROLE_ID,
1038                    PrivilegeMap::from_mz_acl_items(acl_items),
1039                );
1040            }
1041            Builtin::Connection(connection) => {
1042                // Connections can't be versioned.
1043                let versions = BTreeMap::new();
1044                let mut item = self
1045                    .parse_item(
1046                        global_id,
1047                        connection.sql,
1048                        &versions,
1049                        None,
1050                        false,
1051                        None,
1052                        local_expression_cache,
1053                        None,
1054                    )
1055                    .unwrap_or_else(|e| {
1056                        panic!(
1057                            "internal error: failed to load bootstrap connection:\n\
1058                                    {}\n\
1059                                    error:\n\
1060                                    {:?}\n\n\
1061                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
1062                            connection.name, e
1063                        )
1064                    });
1065                let CatalogItem::Connection(_) = &mut item else {
1066                    panic!(
1067                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1068                        connection.name
1069                    );
1070                };
1071
1072                let mut acl_items = vec![rbac::owner_privilege(
1073                    mz_sql::catalog::ObjectType::Connection,
1074                    connection.owner_id.clone(),
1075                )];
1076                acl_items.extend_from_slice(connection.access);
1077
1078                self.insert_item(
1079                    item_id,
1080                    connection.oid,
1081                    name.clone(),
1082                    item,
1083                    connection.owner_id.clone(),
1084                    PrivilegeMap::from_mz_acl_items(acl_items),
1085                );
1086            }
1087        }
1088    }
1089
1090    #[instrument(level = "debug")]
1091    fn apply_temporary_item_update(
1092        &mut self,
1093        temporary_item: TemporaryItem,
1094        diff: StateDiff,
1095        retractions: &mut InProgressRetractions,
1096        local_expression_cache: &mut LocalExpressionCache,
1097    ) {
1098        match diff {
1099            StateDiff::Addition => {
1100                let TemporaryItem {
1101                    id,
1102                    oid,
1103                    global_id,
1104                    schema_id,
1105                    name,
1106                    conn_id,
1107                    create_sql,
1108                    owner_id,
1109                    privileges,
1110                    extra_versions,
1111                } = temporary_item;
1112                // Lazily create the temporary schema if it doesn't exist yet.
1113                // We need the conn_id to create the schema, and it should always be Some for temp items.
1114                let temp_conn_id = conn_id
1115                    .as_ref()
1116                    .expect("temporary items must have a connection id");
1117                if !self.temporary_schemas.contains_key(temp_conn_id) {
1118                    self.create_temporary_schema(temp_conn_id, owner_id)
1119                        .expect("failed to create temporary schema");
1120                }
1121                let schema = self.find_temp_schema(&schema_id);
1122                let name = QualifiedItemName {
1123                    qualifiers: ItemQualifiers {
1124                        database_spec: schema.database().clone(),
1125                        schema_spec: schema.id().clone(),
1126                    },
1127                    item: name.clone(),
1128                };
1129
1130                let entry = match retractions.temp_items.remove(&id) {
1131                    Some(mut retraction) => {
1132                        assert_eq!(retraction.id, id);
1133
1134                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1135                        // item. This is a performance optimization and not needed for correctness.
1136                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1137                        // is still the same as the trait.
1138                        if retraction.create_sql() != create_sql {
1139                            let mut catalog_item = self
1140                                .deserialize_item(
1141                                    global_id,
1142                                    &create_sql,
1143                                    &extra_versions,
1144                                    local_expression_cache,
1145                                    Some(retraction.item),
1146                                )
1147                                .unwrap_or_else(|e| {
1148                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1149                                });
1150                            // Have to patch up the item because parsing doesn't
1151                            // take into account temporary schemas/conn_id.
1152                            // NOTE(aljoscha): I don't like how we're patching
1153                            // this in here, but it's but one of the ways in
1154                            // which temporary items are a bit weird. So, here
1155                            // we are ...
1156                            catalog_item.set_conn_id(conn_id);
1157                            retraction.item = catalog_item;
1158                        }
1159
1160                        retraction.id = id;
1161                        retraction.oid = oid;
1162                        retraction.name = name;
1163                        retraction.owner_id = owner_id;
1164                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1165                        retraction
1166                    }
1167                    None => {
1168                        let mut catalog_item = self
1169                            .deserialize_item(
1170                                global_id,
1171                                &create_sql,
1172                                &extra_versions,
1173                                local_expression_cache,
1174                                None,
1175                            )
1176                            .unwrap_or_else(|e| {
1177                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1178                            });
1179
1180                        // Have to patch up the item because parsing doesn't
1181                        // take into account temporary schemas/conn_id.
1182                        // NOTE(aljoscha): I don't like how we're patching this
1183                        // in here, but it's but one of the ways in which
1184                        // temporary items are a bit weird. So, here we are ...
1185                        catalog_item.set_conn_id(conn_id);
1186
1187                        CatalogEntry {
1188                            item: catalog_item,
1189                            referenced_by: Vec::new(),
1190                            used_by: Vec::new(),
1191                            id,
1192                            oid,
1193                            name,
1194                            owner_id,
1195                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1196                        }
1197                    }
1198                };
1199                self.insert_entry(entry);
1200            }
1201            StateDiff::Retraction => {
1202                let entry = self.drop_item(temporary_item.id);
1203                retractions.temp_items.insert(temporary_item.id, entry);
1204            }
1205        }
1206    }
1207
1208    #[instrument(level = "debug")]
1209    fn apply_item_update(
1210        &mut self,
1211        item: mz_catalog::durable::Item,
1212        diff: StateDiff,
1213        retractions: &mut InProgressRetractions,
1214        local_expression_cache: &mut LocalExpressionCache,
1215    ) -> Result<(), CatalogError> {
1216        match diff {
1217            StateDiff::Addition => {
1218                let key = item.key();
1219                let mz_catalog::durable::Item {
1220                    id,
1221                    oid,
1222                    global_id,
1223                    schema_id,
1224                    name,
1225                    create_sql,
1226                    owner_id,
1227                    privileges,
1228                    extra_versions,
1229                } = item;
1230                let schema = self.find_non_temp_schema(&schema_id);
1231                let name = QualifiedItemName {
1232                    qualifiers: ItemQualifiers {
1233                        database_spec: schema.database().clone(),
1234                        schema_spec: schema.id().clone(),
1235                    },
1236                    item: name.clone(),
1237                };
1238                let entry = match retractions.items.remove(&key) {
1239                    Some(retraction) => {
1240                        assert_eq!(retraction.id, item.id);
1241
1242                        let item = self
1243                            .deserialize_item(
1244                                global_id,
1245                                &create_sql,
1246                                &extra_versions,
1247                                local_expression_cache,
1248                                Some(retraction.item),
1249                            )
1250                            .unwrap_or_else(|e| {
1251                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1252                            });
1253
1254                        CatalogEntry {
1255                            item,
1256                            id,
1257                            oid,
1258                            name,
1259                            owner_id,
1260                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1261                            referenced_by: retraction.referenced_by,
1262                            used_by: retraction.used_by,
1263                        }
1264                    }
1265                    None => {
1266                        let catalog_item = self
1267                            .deserialize_item(
1268                                global_id,
1269                                &create_sql,
1270                                &extra_versions,
1271                                local_expression_cache,
1272                                None,
1273                            )
1274                            .unwrap_or_else(|e| {
1275                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1276                            });
1277                        CatalogEntry {
1278                            item: catalog_item,
1279                            referenced_by: Vec::new(),
1280                            used_by: Vec::new(),
1281                            id,
1282                            oid,
1283                            name,
1284                            owner_id,
1285                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1286                        }
1287                    }
1288                };
1289
1290                self.insert_entry(entry);
1291            }
1292            StateDiff::Retraction => {
1293                let entry = self.drop_item(item.id);
1294                let key = item.into_key_value().0;
1295                retractions.items.insert(key, entry);
1296            }
1297        }
1298        Ok(())
1299    }
1300
1301    #[instrument(level = "debug")]
1302    fn apply_comment_update(
1303        &mut self,
1304        comment: mz_catalog::durable::Comment,
1305        diff: StateDiff,
1306        _retractions: &mut InProgressRetractions,
1307    ) {
1308        match diff {
1309            StateDiff::Addition => {
1310                let prev = Arc::make_mut(&mut self.comments).update_comment(
1311                    comment.object_id,
1312                    comment.sub_component,
1313                    Some(comment.comment),
1314                );
1315                assert_eq!(
1316                    prev, None,
1317                    "values must be explicitly retracted before inserting a new value"
1318                );
1319            }
1320            StateDiff::Retraction => {
1321                let prev = Arc::make_mut(&mut self.comments).update_comment(
1322                    comment.object_id,
1323                    comment.sub_component,
1324                    None,
1325                );
1326                assert_eq!(
1327                    prev,
1328                    Some(comment.comment),
1329                    "retraction does not match existing value: ({:?}, {:?})",
1330                    comment.object_id,
1331                    comment.sub_component,
1332                );
1333            }
1334        }
1335    }
1336
1337    #[instrument(level = "debug")]
1338    fn apply_source_references_update(
1339        &mut self,
1340        source_references: mz_catalog::durable::SourceReferences,
1341        diff: StateDiff,
1342        _retractions: &mut InProgressRetractions,
1343    ) {
1344        match diff {
1345            StateDiff::Addition => {
1346                let prev = self
1347                    .source_references
1348                    .insert(source_references.source_id, source_references.into());
1349                assert!(
1350                    prev.is_none(),
1351                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1352                );
1353            }
1354            StateDiff::Retraction => {
1355                let prev = self.source_references.remove(&source_references.source_id);
1356                assert!(
1357                    prev.is_some(),
1358                    "retraction for a non-existent existing value: {source_references:?}"
1359                );
1360            }
1361        }
1362    }
1363
1364    #[instrument(level = "debug")]
1365    fn apply_storage_collection_metadata_update(
1366        &mut self,
1367        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1368        diff: StateDiff,
1369        _retractions: &mut InProgressRetractions,
1370    ) {
1371        apply_inverted_lookup(
1372            &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1373            &storage_collection_metadata.id,
1374            storage_collection_metadata.shard,
1375            diff,
1376        );
1377    }
1378
1379    #[instrument(level = "debug")]
1380    fn apply_unfinalized_shard_update(
1381        &mut self,
1382        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1383        diff: StateDiff,
1384        _retractions: &mut InProgressRetractions,
1385    ) {
1386        match diff {
1387            StateDiff::Addition => {
1388                let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1389                    .unfinalized_shards
1390                    .insert(unfinalized_shard.shard);
1391                assert!(
1392                    newly_inserted,
1393                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1394                );
1395            }
1396            StateDiff::Retraction => {
1397                let removed = Arc::make_mut(&mut self.storage_metadata)
1398                    .unfinalized_shards
1399                    .remove(&unfinalized_shard.shard);
1400                assert!(
1401                    removed,
1402                    "retraction does not match existing value: {unfinalized_shard:?}"
1403                );
1404            }
1405        }
1406    }
1407
1408    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1409    /// durable catalog.
1410    #[instrument]
1411    pub(crate) fn generate_builtin_table_updates(
1412        &self,
1413        updates: Vec<StateUpdate>,
1414    ) -> Vec<BuiltinTableUpdate> {
1415        let mut builtin_table_updates = Vec::new();
1416        for StateUpdate { kind, ts: _, diff } in updates {
1417            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1418            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1419            builtin_table_updates.extend(builtin_table_update);
1420        }
1421        builtin_table_updates
1422    }
1423
1424    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1425    /// durable catalog.
1426    #[instrument(level = "debug")]
1427    pub(crate) fn generate_builtin_table_update(
1428        &self,
1429        kind: StateUpdateKind,
1430        diff: StateDiff,
1431    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1432        let diff = diff.into();
1433        match kind {
1434            StateUpdateKind::Role(role) => self.pack_role_update(role.id, diff),
1435            StateUpdateKind::RoleAuth(role_auth) => {
1436                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1437            }
1438            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1439                vec![self.pack_default_privileges_update(
1440                    &default_privilege.object,
1441                    &default_privilege.acl_item.grantee,
1442                    &default_privilege.acl_item.acl_mode,
1443                    diff,
1444                )]
1445            }
1446            StateUpdateKind::SystemPrivilege(system_privilege) => {
1447                vec![self.pack_system_privileges_update(system_privilege, diff)]
1448            }
1449            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1450            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1451            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1452                self.pack_item_update(introspection_source_index.item_id, diff)
1453            }
1454            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1455                cluster_replica.cluster_id,
1456                &cluster_replica.name,
1457                diff,
1458            ),
1459            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1460                // Runtime-alterable system objects have real entries in the
1461                // items collection and so get handled through the normal
1462                // `StateUpdateKind::Item`.`
1463                if !system_object_mapping.unique_identifier.runtime_alterable() {
1464                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1465                } else {
1466                    vec![]
1467                }
1468            }
1469            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1470            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1471            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1472                comment.object_id,
1473                comment.sub_component,
1474                &comment.comment,
1475                diff,
1476            )],
1477            StateUpdateKind::SourceReferences(source_references) => {
1478                self.pack_source_references_update(&source_references, diff)
1479            }
1480            StateUpdateKind::AuditLog(audit_log) => {
1481                vec![
1482                    self.pack_audit_log_update(&audit_log.event, diff)
1483                        .expect("could not pack audit log update"),
1484                ]
1485            }
1486            StateUpdateKind::Database(_)
1487            | StateUpdateKind::Schema(_)
1488            | StateUpdateKind::NetworkPolicy(_)
1489            | StateUpdateKind::StorageCollectionMetadata(_)
1490            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1491        }
1492    }
1493
1494    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1495        self.entry_by_id
1496            .get_mut(id)
1497            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1498    }
1499
1500    /// Set the optimized plan for the item identified by `id`.
1501    ///
1502    /// # Panics
1503    /// If the item is not an `Index`, `MaterializedView`, or
1504    /// `ContinualTask`.
1505    pub(super) fn set_optimized_plan(
1506        &mut self,
1507        id: GlobalId,
1508        plan: DataflowDescription<mz_expr::OptimizedMirRelationExpr>,
1509    ) {
1510        let item_id = self.entry_by_global_id[&id];
1511        let entry = self.get_entry_mut(&item_id);
1512        match entry.item_mut() {
1513            CatalogItem::Index(idx) => idx.optimized_plan = Some(Arc::new(plan)),
1514            CatalogItem::MaterializedView(mv) => mv.optimized_plan = Some(Arc::new(plan)),
1515            CatalogItem::ContinualTask(ct) => ct.optimized_plan = Some(Arc::new(plan)),
1516            other => panic!("set_optimized_plan called on {} ({:?})", id, other.typ()),
1517        }
1518    }
1519
1520    /// Set the physical plan for the item identified by `id`.
1521    ///
1522    /// # Panics
1523    /// If the item is not an `Index`, `MaterializedView`, or
1524    /// `ContinualTask`.
1525    pub(super) fn set_physical_plan(
1526        &mut self,
1527        id: GlobalId,
1528        plan: DataflowDescription<mz_compute_types::plan::Plan>,
1529    ) {
1530        let item_id = self.entry_by_global_id[&id];
1531        let entry = self.get_entry_mut(&item_id);
1532        match entry.item_mut() {
1533            CatalogItem::Index(idx) => idx.physical_plan = Some(Arc::new(plan)),
1534            CatalogItem::MaterializedView(mv) => mv.physical_plan = Some(Arc::new(plan)),
1535            CatalogItem::ContinualTask(ct) => ct.physical_plan = Some(Arc::new(plan)),
1536            other => panic!("set_physical_plan called on {} ({:?})", id, other.typ()),
1537        }
1538    }
1539
1540    /// Set the `DataflowMetainfo` for the item identified by `id`.
1541    ///
1542    /// # Panics
1543    /// If the item is not an `Index`, `MaterializedView`, or
1544    /// `ContinualTask`.
1545    pub(super) fn set_dataflow_metainfo(
1546        &mut self,
1547        id: GlobalId,
1548        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
1549    ) {
1550        // Add entries to the `notices_by_dep_id` lookup map.
1551        for notice in metainfo.optimizer_notices.iter() {
1552            for dep_id in notice.dependencies.iter() {
1553                self.notices_by_dep_id
1554                    .entry(*dep_id)
1555                    .or_default()
1556                    .push(Arc::clone(notice));
1557            }
1558            if let Some(item_id) = notice.item_id {
1559                soft_assert_eq_or_log!(
1560                    item_id,
1561                    id,
1562                    "notice.item_id should match the id for whom we are saving the notice"
1563                );
1564            }
1565        }
1566        // Set the metainfo on the catalog object.
1567        let item_id = self.entry_by_global_id[&id];
1568        let entry = self.get_entry_mut(&item_id);
1569        match entry.item_mut() {
1570            CatalogItem::Index(idx) => idx.dataflow_metainfo = Some(metainfo),
1571            CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo = Some(metainfo),
1572            CatalogItem::ContinualTask(ct) => ct.dataflow_metainfo = Some(metainfo),
1573            other => panic!("set_dataflow_metainfo called on {} ({:?})", id, other.typ()),
1574        }
1575    }
1576
1577    /// Clean up optimizer notices for the given dropped catalog entries.
1578    ///
1579    /// This extracts notices directly from the owned entries (which
1580    /// have already been removed from the catalog maps), cleans up
1581    /// the `notices_by_dep_id` reverse index, and removes notices
1582    /// from other (still-live) catalog objects that depended on the
1583    /// dropped items.
1584    ///
1585    /// Returns the set of all dropped notices for builtin table
1586    /// retraction.
1587    #[mz_ore::instrument(level = "trace")]
1588    pub(super) fn drop_optimizer_notices(
1589        &mut self,
1590        dropped_entries: Vec<CatalogEntry>,
1591    ) -> BTreeSet<Arc<OptimizerNotice>> {
1592        let mut dropped_notices = BTreeSet::new();
1593        let mut drop_ids = BTreeSet::new();
1594
1595        // Extract notices directly from the owned dropped entries.
1596        for mut entry in dropped_entries {
1597            drop_ids.extend(entry.global_ids());
1598            let metainfo = match entry.item_mut() {
1599                CatalogItem::Index(idx) => idx.dataflow_metainfo.take(),
1600                CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.take(),
1601                CatalogItem::ContinualTask(ct) => ct.dataflow_metainfo.take(),
1602                _ => None,
1603            };
1604            if let Some(mut metainfo) = metainfo {
1605                soft_assert_or_log!(
1606                    metainfo.optimizer_notices.iter().all_unique(),
1607                    "should have been pushed there by \
1608                     `push_optimizer_notice_dedup`"
1609                );
1610                for n in metainfo.optimizer_notices.drain(..) {
1611                    // Clean up notices_by_dep_id for this notice's
1612                    // dependencies.
1613                    for dep_id in n.dependencies.iter() {
1614                        if let Some(notices) = self.notices_by_dep_id.get_mut(dep_id) {
1615                            notices.retain(|x| &n != x);
1616                            if notices.is_empty() {
1617                                self.notices_by_dep_id.remove(dep_id);
1618                            }
1619                        }
1620                    }
1621                    dropped_notices.insert(n);
1622                }
1623            }
1624        }
1625
1626        // Remove notices_by_dep_id entries keyed by the dropped IDs.
1627        // These are notices on OTHER items that depend on a dropped
1628        // item. We need to remove them from those items' metainfo.
1629        for id in &drop_ids {
1630            if let Some(notices) = self.notices_by_dep_id.remove(id) {
1631                for n in notices.into_iter() {
1632                    // Remove the notice from the catalog object it
1633                    // lives on (if that object still exists — it
1634                    // may have been dropped too, in which case the
1635                    // notice was already collected above).
1636                    if let Some(item_id) = n.item_id.as_ref() {
1637                        if let Some(entry) = self.try_get_entry_by_global_id(item_id) {
1638                            let catalog_item_id = entry.id();
1639                            let entry = self.get_entry_mut(&catalog_item_id);
1640                            let item = entry.item_mut();
1641                            match item {
1642                                CatalogItem::Index(idx) => {
1643                                    if let Some(ref mut m) = idx.dataflow_metainfo {
1644                                        m.optimizer_notices.retain(|x| &n != x);
1645                                    }
1646                                }
1647                                CatalogItem::MaterializedView(mv) => {
1648                                    if let Some(ref mut m) = mv.dataflow_metainfo {
1649                                        m.optimizer_notices.retain(|x| &n != x);
1650                                    }
1651                                }
1652                                CatalogItem::ContinualTask(ct) => {
1653                                    if let Some(ref mut m) = ct.dataflow_metainfo {
1654                                        m.optimizer_notices.retain(|x| &n != x);
1655                                    }
1656                                }
1657                                _ => {}
1658                            }
1659                        }
1660                    }
1661                    dropped_notices.insert(n);
1662                }
1663            }
1664        }
1665
1666        // Clean up notices_by_dep_id entries for dependency IDs
1667        // that are NOT being dropped but had dropped notices.
1668        let todo_dep_ids: BTreeSet<GlobalId> = dropped_notices
1669            .iter()
1670            .flat_map(|n| n.dependencies.iter())
1671            .filter(|dep_id| !drop_ids.contains(dep_id))
1672            .copied()
1673            .collect();
1674        for id in todo_dep_ids {
1675            if let Some(notices) = self.notices_by_dep_id.get_mut(&id) {
1676                notices.retain(|n| !dropped_notices.contains(n));
1677                if notices.is_empty() {
1678                    self.notices_by_dep_id.remove(&id);
1679                }
1680            }
1681        }
1682
1683        dropped_notices
1684    }
1685
1686    fn get_schema_mut(
1687        &mut self,
1688        database_spec: &ResolvedDatabaseSpecifier,
1689        schema_spec: &SchemaSpecifier,
1690        conn_id: &ConnectionId,
1691    ) -> &mut Schema {
1692        // Keep in sync with `get_schemas`
1693        match (database_spec, schema_spec) {
1694            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1695                .temporary_schemas
1696                .get_mut(conn_id)
1697                .expect("catalog out of sync"),
1698            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1699                .ambient_schemas_by_id
1700                .get_mut(id)
1701                .expect("catalog out of sync"),
1702            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1703                .database_by_id
1704                .get_mut(database_id)
1705                .expect("catalog out of sync")
1706                .schemas_by_id
1707                .get_mut(schema_id)
1708                .expect("catalog out of sync"),
1709            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1710                unreachable!("temporary schemas are in the ambient database")
1711            }
1712        }
1713    }
1714
1715    /// Install builtin views to the catalog. This is its own function so that views can be
1716    /// optimized in parallel.
1717    ///
1718    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1719    /// problems by sniffing out specific errors and then retrying once those dependencies are
1720    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1721    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1722    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1723    /// in awaiting with nothing retriggering the attempt.
1724    #[instrument(name = "catalog::parse_views")]
1725    async fn parse_builtin_views(
1726        state: &mut CatalogState,
1727        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1728        retractions: &mut InProgressRetractions,
1729        local_expression_cache: &mut LocalExpressionCache,
1730    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1731        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1732        let (updates, additions): (Vec<_>, Vec<_>) =
1733            builtin_views
1734                .into_iter()
1735                .partition_map(|(view, item_id, gid)| {
1736                    match retractions.system_object_mappings.remove(&item_id) {
1737                        Some(entry) => Either::Left(entry),
1738                        None => Either::Right((view, item_id, gid)),
1739                    }
1740                });
1741
1742        for entry in updates {
1743            // This implies that we updated the fingerprint for some builtin view. The retraction
1744            // was parsed, planned, and optimized using the compiled in definition, not the
1745            // definition from a previous version. So we can just stick the old entry back into the
1746            // catalog.
1747            let item_id = entry.id();
1748            state.insert_entry(entry);
1749            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1750        }
1751
1752        let mut handles = Vec::new();
1753        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1754            BTreeMap::new();
1755        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1756        // Some errors are due to the implementation of casts or SQL functions that depend on some
1757        // view. Instead of figuring out the exact view dependency, delay these until the end.
1758        let mut awaiting_all = Vec::new();
1759        // Completed views, needed to avoid race conditions.
1760        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1761        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1762
1763        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1764        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1765            .into_iter()
1766            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1767            .collect();
1768        let item_ids: Vec<_> = views.keys().copied().collect();
1769
1770        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1771        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1772            if handles.is_empty() && ready.is_empty() {
1773                // Enqueue the views that were waiting for all the others.
1774                ready.extend(awaiting_all.drain(..));
1775            }
1776
1777            // Spawn tasks for all ready views.
1778            if !ready.is_empty() {
1779                let spawn_state = Arc::new(state.clone());
1780                while let Some(id) = ready.pop_front() {
1781                    let (view, global_id) = views.get(&id).expect("must exist");
1782                    let global_id = *global_id;
1783                    let create_sql = view.create_sql();
1784                    // Views can't be versioned.
1785                    let versions = BTreeMap::new();
1786
1787                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1788                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1789                    let task_state = Arc::clone(&spawn_state);
1790                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1791                    let handle = mz_ore::task::spawn_blocking(
1792                        || "parse view",
1793                        move || {
1794                            span.in_scope(|| {
1795                                let res = task_state.parse_item_inner(
1796                                    global_id,
1797                                    &create_sql,
1798                                    &versions,
1799                                    None,
1800                                    false,
1801                                    None,
1802                                    cached_expr,
1803                                    None,
1804                                );
1805                                (id, global_id, res)
1806                            })
1807                        },
1808                    );
1809                    handles.push(handle);
1810                }
1811            }
1812
1813            // Wait for a view to be ready.
1814            let (selected, _idx, remaining) = future::select_all(handles).await;
1815            handles = remaining;
1816            let (id, global_id, res) = selected;
1817            let mut insert_cached_expr = |cached_expr| {
1818                if let Some(cached_expr) = cached_expr {
1819                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1820                }
1821            };
1822            match res {
1823                Ok((item, uncached_expr)) => {
1824                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1825                        local_expression_cache.insert_uncached_expression(
1826                            global_id,
1827                            uncached_expr,
1828                            optimizer_features,
1829                        );
1830                    }
1831                    // Add item to catalog.
1832                    let (view, _gid) = views.remove(&id).expect("must exist");
1833                    let schema_id = state
1834                        .ambient_schemas_by_name
1835                        .get(view.schema)
1836                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1837                    let qname = QualifiedItemName {
1838                        qualifiers: ItemQualifiers {
1839                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1840                            schema_spec: SchemaSpecifier::Id(*schema_id),
1841                        },
1842                        item: view.name.into(),
1843                    };
1844                    let mut acl_items = vec![rbac::owner_privilege(
1845                        mz_sql::catalog::ObjectType::View,
1846                        MZ_SYSTEM_ROLE_ID,
1847                    )];
1848                    acl_items.extend_from_slice(&view.access);
1849
1850                    state.insert_item(
1851                        id,
1852                        view.oid,
1853                        qname,
1854                        item,
1855                        MZ_SYSTEM_ROLE_ID,
1856                        PrivilegeMap::from_mz_acl_items(acl_items),
1857                    );
1858
1859                    // Enqueue any items waiting on this dependency.
1860                    let mut resolved_dependent_items = Vec::new();
1861                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1862                        resolved_dependent_items.extend(dependent_items);
1863                    }
1864                    let entry = state.get_entry(&id);
1865                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1866                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1867                        resolved_dependent_items.extend(dependent_items);
1868                    }
1869                    ready.extend(resolved_dependent_items);
1870
1871                    completed_ids.insert(id);
1872                    completed_names.insert(full_name);
1873                }
1874                // If we were missing a dependency, wait for it to be added.
1875                Err((
1876                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1877                    cached_expr,
1878                )) => {
1879                    insert_cached_expr(cached_expr);
1880                    if completed_ids.contains(&missing_dep) {
1881                        ready.push_back(id);
1882                    } else {
1883                        awaiting_id_dependencies
1884                            .entry(missing_dep)
1885                            .or_default()
1886                            .push(id);
1887                    }
1888                }
1889                // If we were missing a dependency, wait for it to be added.
1890                Err((
1891                    AdapterError::PlanError(plan::PlanError::Catalog(
1892                        SqlCatalogError::UnknownItem(missing_dep),
1893                    )),
1894                    cached_expr,
1895                )) => {
1896                    insert_cached_expr(cached_expr);
1897                    match CatalogItemId::from_str(&missing_dep) {
1898                        Ok(missing_dep) => {
1899                            if completed_ids.contains(&missing_dep) {
1900                                ready.push_back(id);
1901                            } else {
1902                                awaiting_id_dependencies
1903                                    .entry(missing_dep)
1904                                    .or_default()
1905                                    .push(id);
1906                            }
1907                        }
1908                        Err(_) => {
1909                            if completed_names.contains(&missing_dep) {
1910                                ready.push_back(id);
1911                            } else {
1912                                awaiting_name_dependencies
1913                                    .entry(missing_dep)
1914                                    .or_default()
1915                                    .push(id);
1916                            }
1917                        }
1918                    }
1919                }
1920                Err((
1921                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1922                    cached_expr,
1923                )) => {
1924                    insert_cached_expr(cached_expr);
1925                    awaiting_all.push(id);
1926                }
1927                Err((e, _)) => {
1928                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1929                    panic!(
1930                        "internal error: failed to load bootstrap view:\n\
1931                            {name}\n\
1932                            error:\n\
1933                            {e:?}\n\n\
1934                            Make sure that the schema name is specified in the builtin view's create sql statement.
1935                            ",
1936                        name = bad_view.name,
1937                    )
1938                }
1939            }
1940        }
1941
1942        assert!(awaiting_id_dependencies.is_empty());
1943        assert!(
1944            awaiting_name_dependencies.is_empty(),
1945            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1946        );
1947        assert!(awaiting_all.is_empty());
1948        assert!(views.is_empty());
1949
1950        // Generate a builtin table update for all the new views.
1951        builtin_table_updates.extend(
1952            item_ids
1953                .into_iter()
1954                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1955        );
1956
1957        builtin_table_updates
1958    }
1959
1960    /// Associates a name, `CatalogItemId`, and entry.
1961    fn insert_entry(&mut self, entry: CatalogEntry) {
1962        if !entry.id.is_system() {
1963            if let Some(cluster_id) = entry.item.cluster_id() {
1964                self.clusters_by_id
1965                    .get_mut(&cluster_id)
1966                    .expect("catalog out of sync")
1967                    .bound_objects
1968                    .insert(entry.id);
1969            };
1970        }
1971
1972        for u in entry.references().items() {
1973            match self.entry_by_id.get_mut(u) {
1974                Some(metadata) => metadata.referenced_by.push(entry.id()),
1975                None => panic!(
1976                    "Catalog: missing dependent catalog item {} while installing {}",
1977                    &u,
1978                    self.resolve_full_name(entry.name(), entry.conn_id())
1979                ),
1980            }
1981        }
1982        for u in entry.uses() {
1983            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1984            // present.
1985            if u == entry.id() {
1986                continue;
1987            }
1988            match self.entry_by_id.get_mut(&u) {
1989                Some(metadata) => metadata.used_by.push(entry.id()),
1990                None => panic!(
1991                    "Catalog: missing dependent catalog item {} while installing {}",
1992                    &u,
1993                    self.resolve_full_name(entry.name(), entry.conn_id())
1994                ),
1995            }
1996        }
1997        for gid in entry.item.global_ids() {
1998            self.entry_by_global_id.insert(gid, entry.id());
1999        }
2000        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2001        // Lazily create the temporary schema if this is a temporary item and the schema
2002        // doesn't exist yet.
2003        if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
2004            && !self.temporary_schemas.contains_key(conn_id)
2005        {
2006            self.create_temporary_schema(conn_id, entry.owner_id)
2007                .expect("failed to create temporary schema");
2008        }
2009        let schema = self.get_schema_mut(
2010            &entry.name().qualifiers.database_spec,
2011            &entry.name().qualifiers.schema_spec,
2012            conn_id,
2013        );
2014
2015        let prev_id = match entry.item() {
2016            CatalogItem::Func(_) => schema
2017                .functions
2018                .insert(entry.name().item.clone(), entry.id()),
2019            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
2020            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
2021        };
2022
2023        assert!(
2024            prev_id.is_none(),
2025            "builtin name collision on {:?}",
2026            entry.name().item.clone()
2027        );
2028
2029        self.entry_by_id.insert(entry.id(), entry.clone());
2030    }
2031
2032    /// Associates a name, [`CatalogItemId`], and entry.
2033    fn insert_item(
2034        &mut self,
2035        id: CatalogItemId,
2036        oid: u32,
2037        name: QualifiedItemName,
2038        item: CatalogItem,
2039        owner_id: RoleId,
2040        privileges: PrivilegeMap,
2041    ) {
2042        let entry = CatalogEntry {
2043            item,
2044            name,
2045            id,
2046            oid,
2047            used_by: Vec::new(),
2048            referenced_by: Vec::new(),
2049            owner_id,
2050            privileges,
2051        };
2052
2053        self.insert_entry(entry);
2054    }
2055
2056    #[mz_ore::instrument(level = "trace")]
2057    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
2058        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
2059        for u in metadata.references().items() {
2060            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
2061                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
2062            }
2063        }
2064        for u in metadata.uses() {
2065            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
2066                dep_metadata.used_by.retain(|u| *u != metadata.id())
2067            }
2068        }
2069        for gid in metadata.global_ids() {
2070            self.entry_by_global_id.remove(&gid);
2071        }
2072
2073        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2074        let schema = self.get_schema_mut(
2075            &metadata.name().qualifiers.database_spec,
2076            &metadata.name().qualifiers.schema_spec,
2077            conn_id,
2078        );
2079        if metadata.item_type() == CatalogItemType::Type {
2080            schema
2081                .types
2082                .remove(&metadata.name().item)
2083                .expect("catalog out of sync");
2084        } else {
2085            // Functions would need special handling, but we don't yet support
2086            // dropping functions.
2087            assert_ne!(metadata.item_type(), CatalogItemType::Func);
2088
2089            schema
2090                .items
2091                .remove(&metadata.name().item)
2092                .expect("catalog out of sync");
2093        };
2094
2095        if !id.is_system() {
2096            if let Some(cluster_id) = metadata.item().cluster_id() {
2097                assert!(
2098                    self.clusters_by_id
2099                        .get_mut(&cluster_id)
2100                        .expect("catalog out of sync")
2101                        .bound_objects
2102                        .remove(&id),
2103                    "catalog out of sync"
2104                );
2105            }
2106        }
2107
2108        metadata
2109    }
2110
2111    fn insert_introspection_source_index(
2112        &mut self,
2113        cluster_id: ClusterId,
2114        log: &'static BuiltinLog,
2115        item_id: CatalogItemId,
2116        global_id: GlobalId,
2117        oid: u32,
2118    ) {
2119        let (index_name, index) =
2120            self.create_introspection_source_index(cluster_id, log, global_id);
2121        self.insert_item(
2122            item_id,
2123            oid,
2124            index_name,
2125            index,
2126            MZ_SYSTEM_ROLE_ID,
2127            PrivilegeMap::default(),
2128        );
2129    }
2130
2131    fn create_introspection_source_index(
2132        &self,
2133        cluster_id: ClusterId,
2134        log: &'static BuiltinLog,
2135        global_id: GlobalId,
2136    ) -> (QualifiedItemName, CatalogItem) {
2137        let source_name = FullItemName {
2138            database: RawDatabaseSpecifier::Ambient,
2139            schema: log.schema.into(),
2140            item: log.name.into(),
2141        };
2142        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
2143        let mut index_name = QualifiedItemName {
2144            qualifiers: ItemQualifiers {
2145                database_spec: ResolvedDatabaseSpecifier::Ambient,
2146                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
2147            },
2148            item: index_name.clone(),
2149        };
2150        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
2151        let index_item_name = index_name.item.clone();
2152        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
2153        let index = CatalogItem::Index(Index {
2154            global_id,
2155            on: log_global_id,
2156            keys: log
2157                .variant
2158                .index_by()
2159                .into_iter()
2160                .map(MirScalarExpr::column)
2161                .collect(),
2162            create_sql: index_sql(
2163                index_item_name,
2164                cluster_id,
2165                source_name,
2166                &log.variant.desc(),
2167                &log.variant.index_by(),
2168            ),
2169            conn_id: None,
2170            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
2171            cluster_id,
2172            is_retained_metrics_object: false,
2173            custom_logical_compaction_window: None,
2174            optimized_plan: None,
2175            physical_plan: None,
2176            dataflow_metainfo: None,
2177        });
2178        (index_name, index)
2179    }
2180
2181    /// Insert system configuration `name` with `value`.
2182    ///
2183    /// Return a `bool` value indicating whether the configuration was modified
2184    /// by the call.
2185    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
2186        Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
2187    }
2188
2189    /// Reset system configuration `name`.
2190    ///
2191    /// Return a `bool` value indicating whether the configuration was modified
2192    /// by the call.
2193    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
2194        Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
2195    }
2196}
2197
2198/// Sort [`StateUpdate`]s in dependency order.
2199///
2200/// # Panics
2201///
2202/// This function assumes that all provided `updates` have the same timestamp and will panic
2203/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
2204/// `StateUpdateKinds` are unique.
2205fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
2206    fn push_update<T>(
2207        update: T,
2208        diff: StateDiff,
2209        retractions: &mut Vec<T>,
2210        additions: &mut Vec<T>,
2211    ) {
2212        match diff {
2213            StateDiff::Retraction => retractions.push(update),
2214            StateDiff::Addition => additions.push(update),
2215        }
2216    }
2217
2218    soft_assert_no_log!(
2219        updates.iter().map(|update| update.ts).all_equal(),
2220        "all timestamps should be equal: {updates:?}"
2221    );
2222    soft_assert_no_log!(
2223        {
2224            let mut dedup = BTreeSet::new();
2225            updates.iter().all(|update| dedup.insert(&update.kind))
2226        },
2227        "updates should be consolidated: {updates:?}"
2228    );
2229
2230    // Partition updates by type so that we can weave different update types into the right spots.
2231    let mut pre_cluster_retractions = Vec::new();
2232    let mut pre_cluster_additions = Vec::new();
2233    let mut cluster_retractions = Vec::new();
2234    let mut cluster_additions = Vec::new();
2235    let mut builtin_item_updates = Vec::new();
2236    let mut item_retractions = Vec::new();
2237    let mut item_additions = Vec::new();
2238    let mut temp_item_retractions = Vec::new();
2239    let mut temp_item_additions = Vec::new();
2240    let mut post_item_retractions = Vec::new();
2241    let mut post_item_additions = Vec::new();
2242    for update in updates {
2243        let diff = update.diff.clone();
2244        match update.kind {
2245            StateUpdateKind::Role(_)
2246            | StateUpdateKind::RoleAuth(_)
2247            | StateUpdateKind::Database(_)
2248            | StateUpdateKind::Schema(_)
2249            | StateUpdateKind::DefaultPrivilege(_)
2250            | StateUpdateKind::SystemPrivilege(_)
2251            | StateUpdateKind::SystemConfiguration(_)
2252            | StateUpdateKind::NetworkPolicy(_) => push_update(
2253                update,
2254                diff,
2255                &mut pre_cluster_retractions,
2256                &mut pre_cluster_additions,
2257            ),
2258            StateUpdateKind::Cluster(_)
2259            | StateUpdateKind::IntrospectionSourceIndex(_)
2260            | StateUpdateKind::ClusterReplica(_) => push_update(
2261                update,
2262                diff,
2263                &mut cluster_retractions,
2264                &mut cluster_additions,
2265            ),
2266            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2267                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2268            }
2269            StateUpdateKind::TemporaryItem(item) => push_update(
2270                (item, update.ts, update.diff),
2271                diff,
2272                &mut temp_item_retractions,
2273                &mut temp_item_additions,
2274            ),
2275            StateUpdateKind::Item(item) => push_update(
2276                (item, update.ts, update.diff),
2277                diff,
2278                &mut item_retractions,
2279                &mut item_additions,
2280            ),
2281            StateUpdateKind::Comment(_)
2282            | StateUpdateKind::SourceReferences(_)
2283            | StateUpdateKind::AuditLog(_)
2284            | StateUpdateKind::StorageCollectionMetadata(_)
2285            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2286                update,
2287                diff,
2288                &mut post_item_retractions,
2289                &mut post_item_additions,
2290            ),
2291        }
2292    }
2293
2294    // Sort builtin item updates by dependency. The builtin definition order must be a dependency
2295    // order, so we can use that to avoid a more expensive topo sorting.
2296    let builtin_item_updates = builtin_item_updates
2297        .into_iter()
2298        .map(|(system_object_mapping, ts, diff)| {
2299            let idx = BUILTIN_LOOKUP
2300                .get(&system_object_mapping.description)
2301                .expect("missing builtin")
2302                .0;
2303            (idx, system_object_mapping, ts, diff)
2304        })
2305        .sorted_by_key(|(idx, _, _, _)| *idx)
2306        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2307
2308    // Partition builtins based on whether or not they should be applied before or after clusters.
2309    // Clusters depend on sources (introspection logs), so sources need to be applied first.
2310    // Everything else can be applied after clusters.
2311    let mut builtin_source_retractions = Vec::new();
2312    let mut builtin_source_additions = Vec::new();
2313    let mut other_builtin_retractions = Vec::new();
2314    let mut other_builtin_additions = Vec::new();
2315    for (builtin_item_update, ts, diff) in builtin_item_updates {
2316        let object_type = builtin_item_update.description.object_type;
2317        let update = StateUpdate {
2318            kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2319            ts,
2320            diff,
2321        };
2322        if object_type == CatalogItemType::Source {
2323            push_update(
2324                update,
2325                diff,
2326                &mut builtin_source_retractions,
2327                &mut builtin_source_additions,
2328            );
2329        } else {
2330            push_update(
2331                update,
2332                diff,
2333                &mut other_builtin_retractions,
2334                &mut other_builtin_additions,
2335            );
2336        }
2337    }
2338
2339    /// Sort items by their dependencies using topological sort.
2340    ///
2341    /// # Panics
2342    ///
2343    /// This function requires that all provided items have unique item IDs.
2344    fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2345        tracing::debug!(?items, "sorting items by dependencies");
2346
2347        let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2348        let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2349            let statement = mz_sql::parse::parse(&item.0.create_sql)
2350                .expect("valid create_sql")
2351                .into_element()
2352                .ast;
2353            mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2354        };
2355        sort_topological(items, key_fn, dependencies_fn);
2356    }
2357
2358    /// Sort item updates by dependency.
2359    ///
2360    /// First we group items into groups that are totally ordered by dependency. For example, when
2361    /// sorting all items by dependency we know that all tables can come after all sources, because
2362    /// a source can never depend on a table. Second, we sort the items in each group in
2363    /// topological order, or by ID, depending on the type.
2364    ///
2365    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2366    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2367    /// approach would be to investigate each item, discover their exact dependencies, and then
2368    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2369    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2370    /// referred to by name.
2371    ///
2372    /// The logic of this function should match [`sort_temp_item_updates`].
2373    fn sort_item_updates(
2374        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2375    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2376        // Partition items into groups s.t. each item in one group has a predefined order with all
2377        // items in other groups. For example, all sinks are ordered greater than all tables.
2378        let mut types = Vec::new();
2379        // N.B. Functions can depend on system tables, but not user tables.
2380        // TODO(udf): This will change when UDFs are supported.
2381        let mut funcs = Vec::new();
2382        let mut secrets = Vec::new();
2383        let mut connections = Vec::new();
2384        let mut sources = Vec::new();
2385        let mut tables = Vec::new();
2386        let mut derived_items = Vec::new();
2387        let mut sinks = Vec::new();
2388        let mut continual_tasks = Vec::new();
2389
2390        for update in item_updates {
2391            match update.0.item_type() {
2392                CatalogItemType::Type => types.push(update),
2393                CatalogItemType::Func => funcs.push(update),
2394                CatalogItemType::Secret => secrets.push(update),
2395                CatalogItemType::Connection => connections.push(update),
2396                CatalogItemType::Source => sources.push(update),
2397                CatalogItemType::Table => tables.push(update),
2398                CatalogItemType::View
2399                | CatalogItemType::MaterializedView
2400                | CatalogItemType::Index => derived_items.push(update),
2401                CatalogItemType::Sink => sinks.push(update),
2402                CatalogItemType::ContinualTask => continual_tasks.push(update),
2403            }
2404        }
2405
2406        // For some groups, the items in them can depend on each other and can be `ALTER`ed so that
2407        // an item ends up depending on an item with a greater ID. Thus we need to perform
2408        // topological sort for these groups.
2409        sort_items_topological(&mut connections);
2410        sort_items_topological(&mut derived_items);
2411
2412        // Other groups we can simply sort by ID.
2413        for group in [
2414            &mut types,
2415            &mut funcs,
2416            &mut secrets,
2417            &mut sources,
2418            &mut tables,
2419            &mut sinks,
2420            &mut continual_tasks,
2421        ] {
2422            group.sort_by_key(|(item, _, _)| item.id);
2423        }
2424
2425        iter::empty()
2426            .chain(types)
2427            .chain(funcs)
2428            .chain(secrets)
2429            .chain(connections)
2430            .chain(sources)
2431            .chain(tables)
2432            .chain(derived_items)
2433            .chain(sinks)
2434            .chain(continual_tasks)
2435            .collect()
2436    }
2437
2438    let item_retractions = sort_item_updates(item_retractions);
2439    let item_additions = sort_item_updates(item_additions);
2440
2441    /// Sort temporary item updates by dependency.
2442    ///
2443    /// The logic of this function should match [`sort_item_updates`].
2444    fn sort_temp_item_updates(
2445        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2446    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2447        // Partition items into groups s.t. each item in one group has a predefined order with all
2448        // items in other groups. For example, all sinks are ordered greater than all tables.
2449        let mut types = Vec::new();
2450        // N.B. Functions can depend on system tables, but not user tables.
2451        let mut funcs = Vec::new();
2452        let mut secrets = Vec::new();
2453        let mut connections = Vec::new();
2454        let mut sources = Vec::new();
2455        let mut tables = Vec::new();
2456        let mut derived_items = Vec::new();
2457        let mut sinks = Vec::new();
2458        let mut continual_tasks = Vec::new();
2459
2460        for update in temp_item_updates {
2461            match update.0.item_type() {
2462                CatalogItemType::Type => types.push(update),
2463                CatalogItemType::Func => funcs.push(update),
2464                CatalogItemType::Secret => secrets.push(update),
2465                CatalogItemType::Connection => connections.push(update),
2466                CatalogItemType::Source => sources.push(update),
2467                CatalogItemType::Table => tables.push(update),
2468                CatalogItemType::View
2469                | CatalogItemType::MaterializedView
2470                | CatalogItemType::Index => derived_items.push(update),
2471                CatalogItemType::Sink => sinks.push(update),
2472                CatalogItemType::ContinualTask => continual_tasks.push(update),
2473            }
2474        }
2475
2476        // Within each group, sort by ID.
2477        for group in [
2478            &mut types,
2479            &mut funcs,
2480            &mut secrets,
2481            &mut connections,
2482            &mut sources,
2483            &mut tables,
2484            &mut derived_items,
2485            &mut sinks,
2486            &mut continual_tasks,
2487        ] {
2488            group.sort_by_key(|(item, _, _)| item.id);
2489        }
2490
2491        iter::empty()
2492            .chain(types)
2493            .chain(funcs)
2494            .chain(secrets)
2495            .chain(connections)
2496            .chain(sources)
2497            .chain(tables)
2498            .chain(derived_items)
2499            .chain(sinks)
2500            .chain(continual_tasks)
2501            .collect()
2502    }
2503    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2504    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2505
2506    /// Merge sorted temporary and non-temp items.
2507    fn merge_item_updates(
2508        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2509        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2510    ) -> Vec<StateUpdate> {
2511        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2512
2513        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2514            (item_updates.front(), temp_item_updates.front())
2515        {
2516            if item.id < temp_item.id {
2517                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2518                state_updates.push(StateUpdate {
2519                    kind: StateUpdateKind::Item(item),
2520                    ts,
2521                    diff,
2522                });
2523            } else if item.id > temp_item.id {
2524                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2525                state_updates.push(StateUpdate {
2526                    kind: StateUpdateKind::TemporaryItem(temp_item),
2527                    ts,
2528                    diff,
2529                });
2530            } else {
2531                unreachable!(
2532                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2533                );
2534            }
2535        }
2536
2537        while let Some((item, ts, diff)) = item_updates.pop_front() {
2538            state_updates.push(StateUpdate {
2539                kind: StateUpdateKind::Item(item),
2540                ts,
2541                diff,
2542            });
2543        }
2544
2545        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2546            state_updates.push(StateUpdate {
2547                kind: StateUpdateKind::TemporaryItem(temp_item),
2548                ts,
2549                diff,
2550            });
2551        }
2552
2553        state_updates
2554    }
2555    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2556    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2557
2558    // Put everything back together.
2559    iter::empty()
2560        // All retractions must be reversed.
2561        .chain(post_item_retractions.into_iter().rev())
2562        .chain(item_retractions.into_iter().rev())
2563        .chain(other_builtin_retractions.into_iter().rev())
2564        .chain(cluster_retractions.into_iter().rev())
2565        .chain(builtin_source_retractions.into_iter().rev())
2566        .chain(pre_cluster_retractions.into_iter().rev())
2567        .chain(pre_cluster_additions)
2568        .chain(builtin_source_additions)
2569        .chain(cluster_additions)
2570        .chain(other_builtin_additions)
2571        .chain(item_additions)
2572        .chain(post_item_additions)
2573        .collect()
2574}
2575
2576/// Groups of updates of certain types are applied in batches to improve
2577/// performance. A constraint is that updates must be applied in order. This
2578/// process is modeled as a state machine that batches then applies groups of
2579/// updates.
2580enum ApplyState {
2581    /// Additions of builtin views.
2582    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2583    /// Item updates that aren't builtin view additions.
2584    ///
2585    /// This contains all updates whose application requires calling
2586    /// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2587    /// flags.
2588    Items(Vec<StateUpdate>),
2589    /// All other updates.
2590    Updates(Vec<StateUpdate>),
2591}
2592
2593impl ApplyState {
2594    fn new(update: StateUpdate) -> Self {
2595        use StateUpdateKind::*;
2596        match &update.kind {
2597            SystemObjectMapping(som)
2598                if som.description.object_type == CatalogItemType::View
2599                    && update.diff == StateDiff::Addition =>
2600            {
2601                let view_addition = lookup_builtin_view_addition(som.clone());
2602                Self::BuiltinViewAdditions(vec![view_addition])
2603            }
2604
2605            IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2606                Self::Items(vec![update])
2607            }
2608
2609            Role(_)
2610            | RoleAuth(_)
2611            | Database(_)
2612            | Schema(_)
2613            | DefaultPrivilege(_)
2614            | SystemPrivilege(_)
2615            | SystemConfiguration(_)
2616            | Cluster(_)
2617            | NetworkPolicy(_)
2618            | ClusterReplica(_)
2619            | SourceReferences(_)
2620            | Comment(_)
2621            | AuditLog(_)
2622            | StorageCollectionMetadata(_)
2623            | UnfinalizedShard(_) => Self::Updates(vec![update]),
2624        }
2625    }
2626
2627    /// Apply all updates that have been batched in `self`.
2628    ///
2629    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2630    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2631    /// details.
2632    async fn apply(
2633        self,
2634        state: &mut CatalogState,
2635        retractions: &mut InProgressRetractions,
2636        local_expression_cache: &mut LocalExpressionCache,
2637    ) -> (
2638        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2639        Vec<ParsedStateUpdate>,
2640    ) {
2641        match self {
2642            Self::BuiltinViewAdditions(builtin_view_additions) => {
2643                let restore = Arc::clone(&state.system_configuration);
2644                Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2645                let builtin_table_updates = CatalogState::parse_builtin_views(
2646                    state,
2647                    builtin_view_additions,
2648                    retractions,
2649                    local_expression_cache,
2650                )
2651                .await;
2652                state.system_configuration = restore;
2653                (builtin_table_updates, Vec::new())
2654            }
2655            Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2656                state
2657                    .apply_updates_inner(updates, retractions, local_expression_cache)
2658                    .expect("corrupt catalog")
2659            }),
2660            Self::Updates(updates) => state
2661                .apply_updates_inner(updates, retractions, local_expression_cache)
2662                .expect("corrupt catalog"),
2663        }
2664    }
2665
2666    async fn step(
2667        self,
2668        next: Self,
2669        state: &mut CatalogState,
2670        retractions: &mut InProgressRetractions,
2671        local_expression_cache: &mut LocalExpressionCache,
2672    ) -> (
2673        Self,
2674        (
2675            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2676            Vec<ParsedStateUpdate>,
2677        ),
2678    ) {
2679        match (self, next) {
2680            (
2681                Self::BuiltinViewAdditions(mut builtin_view_additions),
2682                Self::BuiltinViewAdditions(next_builtin_view_additions),
2683            ) => {
2684                // Continue batching builtin view additions.
2685                builtin_view_additions.extend(next_builtin_view_additions);
2686                (
2687                    Self::BuiltinViewAdditions(builtin_view_additions),
2688                    (Vec::new(), Vec::new()),
2689                )
2690            }
2691            (Self::Items(mut updates), Self::Items(next_updates)) => {
2692                // Continue batching item updates.
2693                updates.extend(next_updates);
2694                (Self::Items(updates), (Vec::new(), Vec::new()))
2695            }
2696            (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2697                // Continue batching updates.
2698                updates.extend(next_updates);
2699                (Self::Updates(updates), (Vec::new(), Vec::new()))
2700            }
2701            (apply_state, next_apply_state) => {
2702                // Apply the current batch and start batching new apply state.
2703                let updates = apply_state
2704                    .apply(state, retractions, local_expression_cache)
2705                    .await;
2706                (next_apply_state, updates)
2707            }
2708        }
2709    }
2710}
2711
2712/// Trait abstracting over map operations needed by [`apply_inverted_lookup`] and
2713/// [`apply_with_update`]. Both [`BTreeMap`] and [`imbl::OrdMap`] implement this.
2714trait MutableMap<K, V> {
2715    fn insert(&mut self, key: K, value: V) -> Option<V>;
2716    fn remove(&mut self, key: &K) -> Option<V>;
2717}
2718
2719impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2720    fn insert(&mut self, key: K, value: V) -> Option<V> {
2721        BTreeMap::insert(self, key, value)
2722    }
2723    fn remove(&mut self, key: &K) -> Option<V> {
2724        BTreeMap::remove(self, key)
2725    }
2726}
2727
2728impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2729    fn insert(&mut self, key: K, value: V) -> Option<V> {
2730        imbl::OrdMap::insert(self, key, value)
2731    }
2732    fn remove(&mut self, key: &K) -> Option<V> {
2733        imbl::OrdMap::remove(self, key)
2734    }
2735}
2736
2737/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2738/// generally IDs.
2739///
2740/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2741fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2742where
2743    K: Ord + Clone + Debug,
2744    V: PartialEq + Debug,
2745{
2746    match diff {
2747        StateDiff::Retraction => {
2748            let prev = map.remove(key);
2749            assert_eq!(
2750                prev,
2751                Some(value),
2752                "retraction does not match existing value: {key:?}"
2753            );
2754        }
2755        StateDiff::Addition => {
2756            let prev = map.insert(key.clone(), value);
2757            assert_eq!(
2758                prev, None,
2759                "values must be explicitly retracted before inserting a new value: {key:?}"
2760            );
2761        }
2762    }
2763}
2764
2765/// Helper method to update catalog state, that may need to be updated from a previously retracted
2766/// object.
2767fn apply_with_update<K, V, D>(
2768    map: &mut impl MutableMap<K, V>,
2769    durable: D,
2770    key_fn: impl FnOnce(&D) -> K,
2771    diff: StateDiff,
2772    retractions: &mut BTreeMap<D::Key, V>,
2773) where
2774    K: Ord,
2775    V: UpdateFrom<D> + PartialEq + Debug,
2776    D: DurableType,
2777    D::Key: Ord,
2778{
2779    match diff {
2780        StateDiff::Retraction => {
2781            let mem_key = key_fn(&durable);
2782            let value = map
2783                .remove(&mem_key)
2784                .expect("retraction does not match existing value: {key:?}");
2785            let durable_key = durable.into_key_value().0;
2786            retractions.insert(durable_key, value);
2787        }
2788        StateDiff::Addition => {
2789            let mem_key = key_fn(&durable);
2790            let durable_key = durable.key();
2791            let value = match retractions.remove(&durable_key) {
2792                Some(mut retraction) => {
2793                    retraction.update_from(durable);
2794                    retraction
2795                }
2796                None => durable.into(),
2797            };
2798            let prev = map.insert(mem_key, value);
2799            assert_eq!(
2800                prev, None,
2801                "values must be explicitly retracted before inserting a new value"
2802            );
2803        }
2804    }
2805}
2806
2807/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2808fn lookup_builtin_view_addition(
2809    mapping: SystemObjectMapping,
2810) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2811    let (_, builtin) = BUILTIN_LOOKUP
2812        .get(&mapping.description)
2813        .expect("missing builtin view");
2814    let Builtin::View(view) = builtin else {
2815        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2816    };
2817
2818    (
2819        view,
2820        mapping.unique_identifier.catalog_id,
2821        mapping.unique_identifier.global_id,
2822    )
2823}