Skip to main content

mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35    Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_compute_types::dataflows::DataflowDescription;
40use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_ore::collections::CollectionExt;
44use mz_ore::tracing::OpenTelemetryContext;
45use mz_ore::{
46    instrument, soft_assert_eq_or_log, soft_assert_no_log, soft_assert_or_log, soft_panic_or_log,
47};
48use mz_pgrepr::oid::INVALID_OID;
49use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
50use mz_repr::role_id::RoleId;
51use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
52use mz_sql::catalog::CatalogError as SqlCatalogError;
53use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
54use mz_sql::names::{
55    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
56    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
57};
58use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
59use mz_sql::session::vars::{VarError, VarInput};
60use mz_sql::{plan, rbac};
61use mz_sql_parser::ast::Expr;
62use mz_storage_types::sources::Timeline;
63use mz_transform::dataflow::DataflowMetainfo;
64use mz_transform::notice::OptimizerNotice;
65use tracing::{info_span, warn};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{BuiltinTableUpdate, CatalogState};
70use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
71use crate::util::{index_sql, sort_topological};
72
73/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
74/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
75/// results in applying a retraction for that object followed by applying an addition for that
76/// object. When applying those additions it can be extremely expensive to re-build that
77/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
78/// retractions, so it can be used during additions.
79///
80/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
81/// objects that maintain denormalized state.
82// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
83// the update step is a no-op for certain types.
84#[derive(Debug, Clone, Default)]
85struct InProgressRetractions {
86    roles: BTreeMap<RoleKey, Role>,
87    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
88    databases: BTreeMap<DatabaseKey, Database>,
89    schemas: BTreeMap<SchemaKey, Schema>,
90    clusters: BTreeMap<ClusterKey, Cluster>,
91    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
92    items: BTreeMap<ItemKey, CatalogEntry>,
93    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
94    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
95    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
96}
97
98impl CatalogState {
99    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
100    ///
101    /// Returns builtin table updates corresponding to the changes to catalog state.
102    #[must_use]
103    #[instrument]
104    pub(crate) async fn apply_updates(
105        &mut self,
106        updates: Vec<StateUpdate>,
107        local_expression_cache: &mut LocalExpressionCache,
108    ) -> (
109        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
110        Vec<ParsedStateUpdate>,
111    ) {
112        let mut builtin_table_updates = Vec::with_capacity(updates.len());
113        let mut catalog_updates = Vec::with_capacity(updates.len());
114
115        // First, consolidate updates. The code that applies parsed state
116        // updates _requires_ that the given updates are consolidated. There
117        // must be at most one addition and/or one retraction for a given item,
118        // as identified by that items ID type.
119        let updates = Self::consolidate_updates(updates);
120
121        // Apply updates in groups, according to their timestamps.
122        let mut groups: Vec<Vec<_>> = Vec::new();
123        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
124            // Bring the updates into the pseudo-topological order that we need
125            // for updating our in-memory state and generating builtin table
126            // updates.
127            let updates = sort_updates(updates.collect());
128            groups.push(updates);
129        }
130
131        for updates in groups {
132            let mut apply_state = ApplyState::Updates(Vec::new());
133            let mut retractions = InProgressRetractions::default();
134
135            for update in updates {
136                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
137                    .step(
138                        ApplyState::new(update),
139                        self,
140                        &mut retractions,
141                        local_expression_cache,
142                    )
143                    .await;
144                apply_state = next_apply_state;
145                builtin_table_updates.extend(builtin_table_update);
146                catalog_updates.extend(catalog_update);
147            }
148
149            // Apply remaining state.
150            let (builtin_table_update, catalog_update) = apply_state
151                .apply(self, &mut retractions, local_expression_cache)
152                .await;
153            builtin_table_updates.extend(builtin_table_update);
154            catalog_updates.extend(catalog_update);
155
156            // Clean up plans and optimizer notices for items that
157            // were retracted but not replaced (i.e., truly dropped).
158            let dropped_entries: Vec<CatalogEntry> = retractions
159                .items
160                .into_values()
161                .chain(retractions.temp_items.into_values())
162                .collect();
163            if !dropped_entries.is_empty() {
164                let dropped_notices = self.drop_optimizer_notices(dropped_entries);
165                if self.system_config().enable_mz_notices() {
166                    self.pack_optimizer_notice_updates(
167                        &mut builtin_table_updates,
168                        dropped_notices.iter(),
169                        Diff::MINUS_ONE,
170                    );
171                }
172            }
173        }
174
175        (builtin_table_updates, catalog_updates)
176    }
177
178    /// It can happen that the sequencing logic creates "fluctuating" updates
179    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
180    /// for a table, there will be a retraction of the original table state,
181    /// then an addition for the same table but stripped of some of the roles
182    /// and access things, and then a retraction for that intermediate table
183    /// state. By consolidating, the intermediate state addition/retraction will
184    /// cancel out and we'll only see the retraction for the original state.
185    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
186        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
187            .into_iter()
188            .map(|update| (update.kind, update.ts, update.diff.into()))
189            .collect_vec();
190
191        consolidate_updates(&mut updates);
192
193        updates
194            .into_iter()
195            .map(|(kind, ts, diff)| StateUpdate {
196                kind,
197                ts,
198                diff: diff
199                    .try_into()
200                    .expect("catalog state cannot have diff other than -1 or 1"),
201            })
202            .collect_vec()
203    }
204
205    #[instrument(level = "debug")]
206    fn apply_updates_inner(
207        &mut self,
208        updates: Vec<StateUpdate>,
209        retractions: &mut InProgressRetractions,
210        local_expression_cache: &mut LocalExpressionCache,
211    ) -> Result<
212        (
213            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
214            Vec<ParsedStateUpdate>,
215        ),
216        CatalogError,
217    > {
218        soft_assert_no_log!(
219            updates.iter().map(|update| update.ts).all_equal(),
220            "all timestamps should be equal: {updates:?}"
221        );
222
223        let mut update_system_config = false;
224
225        let mut builtin_table_updates = Vec::with_capacity(updates.len());
226        let mut catalog_updates = Vec::new();
227
228        for state_update in updates {
229            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
230                update_system_config = true;
231            }
232
233            match state_update.diff {
234                StateDiff::Retraction => {
235                    // We want the parsed catalog updates to match the state of
236                    // the catalog _before_ applying a retraction. So that we can
237                    // still have useful in-memory state to work with.
238                    if let Some(update) =
239                        parsed_state_updates::parse_state_update(self, state_update.clone())
240                    {
241                        catalog_updates.push(update);
242                    }
243
244                    // We want the builtin table retraction to match the state of the catalog
245                    // before applying the update.
246                    builtin_table_updates.extend(self.generate_builtin_table_update(
247                        state_update.kind.clone(),
248                        state_update.diff,
249                    ));
250                    self.apply_update(
251                        state_update.kind,
252                        state_update.diff,
253                        retractions,
254                        local_expression_cache,
255                    )?;
256                }
257                StateDiff::Addition => {
258                    self.apply_update(
259                        state_update.kind.clone(),
260                        state_update.diff,
261                        retractions,
262                        local_expression_cache,
263                    )?;
264                    // We want the builtin table addition to match the state of
265                    // the catalog after applying the update. So that we already
266                    // have useful in-memory state to work with.
267                    builtin_table_updates.extend(self.generate_builtin_table_update(
268                        state_update.kind.clone(),
269                        state_update.diff,
270                    ));
271
272                    // We want the parsed catalog updates to match the state of
273                    // the catalog _after_ applying an addition.
274                    if let Some(update) =
275                        parsed_state_updates::parse_state_update(self, state_update.clone())
276                    {
277                        catalog_updates.push(update);
278                    }
279                }
280            }
281        }
282
283        if update_system_config {
284            self.system_configuration.dyncfg_updates();
285        }
286
287        Ok((builtin_table_updates, catalog_updates))
288    }
289
290    #[instrument(level = "debug")]
291    fn apply_update(
292        &mut self,
293        kind: StateUpdateKind,
294        diff: StateDiff,
295        retractions: &mut InProgressRetractions,
296        local_expression_cache: &mut LocalExpressionCache,
297    ) -> Result<(), CatalogError> {
298        match kind {
299            StateUpdateKind::Role(role) => {
300                self.apply_role_update(role, diff, retractions);
301            }
302            StateUpdateKind::RoleAuth(role_auth) => {
303                self.apply_role_auth_update(role_auth, diff, retractions);
304            }
305            StateUpdateKind::Database(database) => {
306                self.apply_database_update(database, diff, retractions);
307            }
308            StateUpdateKind::Schema(schema) => {
309                self.apply_schema_update(schema, diff, retractions);
310            }
311            StateUpdateKind::DefaultPrivilege(default_privilege) => {
312                self.apply_default_privilege_update(default_privilege, diff, retractions);
313            }
314            StateUpdateKind::SystemPrivilege(system_privilege) => {
315                self.apply_system_privilege_update(system_privilege, diff, retractions);
316            }
317            StateUpdateKind::SystemConfiguration(system_configuration) => {
318                self.apply_system_configuration_update(system_configuration, diff, retractions);
319            }
320            StateUpdateKind::ClusterSystemConfiguration(cfg) => {
321                Self::apply_scoped_system_configuration_update(
322                    &mut self.scoped_system_parameters.cluster,
323                    cfg.cluster_id,
324                    cfg.name,
325                    cfg.value,
326                    diff,
327                );
328            }
329            StateUpdateKind::ReplicaSystemConfiguration(cfg) => {
330                Self::apply_scoped_system_configuration_update(
331                    &mut self.scoped_system_parameters.replica,
332                    cfg.replica_id,
333                    cfg.name,
334                    cfg.value,
335                    diff,
336                );
337            }
338            StateUpdateKind::Cluster(cluster) => {
339                self.apply_cluster_update(cluster, diff, retractions);
340            }
341            StateUpdateKind::NetworkPolicy(network_policy) => {
342                self.apply_network_policy_update(network_policy, diff, retractions);
343            }
344            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
345                self.apply_introspection_source_index_update(
346                    introspection_source_index,
347                    diff,
348                    retractions,
349                );
350            }
351            StateUpdateKind::ClusterReplica(cluster_replica) => {
352                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
353            }
354            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
355                self.apply_system_object_mapping_update(
356                    system_object_mapping,
357                    diff,
358                    retractions,
359                    local_expression_cache,
360                );
361            }
362            StateUpdateKind::TemporaryItem(item) => {
363                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
364            }
365            StateUpdateKind::Item(item) => {
366                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
367            }
368            StateUpdateKind::Comment(comment) => {
369                self.apply_comment_update(comment, diff, retractions);
370            }
371            StateUpdateKind::SourceReferences(source_reference) => {
372                self.apply_source_references_update(source_reference, diff, retractions);
373            }
374            StateUpdateKind::AuditLog(_audit_log) => {
375                // Audit logs are not stored in-memory.
376            }
377            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
378                self.apply_storage_collection_metadata_update(
379                    storage_collection_metadata,
380                    diff,
381                    retractions,
382                );
383            }
384            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
385                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
386            }
387        }
388
389        Ok(())
390    }
391
392    #[instrument(level = "debug")]
393    fn apply_role_auth_update(
394        &mut self,
395        role_auth: mz_catalog::durable::RoleAuth,
396        diff: StateDiff,
397        retractions: &mut InProgressRetractions,
398    ) {
399        apply_with_update(
400            &mut self.role_auth_by_id,
401            role_auth,
402            |role_auth| role_auth.role_id,
403            diff,
404            &mut retractions.role_auths,
405        );
406    }
407
408    #[instrument(level = "debug")]
409    fn apply_role_update(
410        &mut self,
411        role: mz_catalog::durable::Role,
412        diff: StateDiff,
413        retractions: &mut InProgressRetractions,
414    ) {
415        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
416        apply_with_update(
417            &mut self.roles_by_id,
418            role,
419            |role| role.id,
420            diff,
421            &mut retractions.roles,
422        );
423    }
424
425    #[instrument(level = "debug")]
426    fn apply_database_update(
427        &mut self,
428        database: mz_catalog::durable::Database,
429        diff: StateDiff,
430        retractions: &mut InProgressRetractions,
431    ) {
432        apply_inverted_lookup(
433            &mut self.database_by_name,
434            &database.name,
435            database.id,
436            diff,
437        );
438        apply_with_update(
439            &mut self.database_by_id,
440            database,
441            |database| database.id,
442            diff,
443            &mut retractions.databases,
444        );
445    }
446
447    #[instrument(level = "debug")]
448    fn apply_schema_update(
449        &mut self,
450        schema: mz_catalog::durable::Schema,
451        diff: StateDiff,
452        retractions: &mut InProgressRetractions,
453    ) {
454        match &schema.database_id {
455            Some(database_id) => {
456                let db = self
457                    .database_by_id
458                    .get_mut(database_id)
459                    .expect("catalog out of sync");
460                apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
461                apply_with_update(
462                    &mut db.schemas_by_id,
463                    schema,
464                    |schema| schema.id,
465                    diff,
466                    &mut retractions.schemas,
467                );
468            }
469            None => {
470                apply_inverted_lookup(
471                    &mut self.ambient_schemas_by_name,
472                    &schema.name,
473                    schema.id,
474                    diff,
475                );
476                apply_with_update(
477                    &mut self.ambient_schemas_by_id,
478                    schema,
479                    |schema| schema.id,
480                    diff,
481                    &mut retractions.schemas,
482                );
483            }
484        }
485    }
486
487    #[instrument(level = "debug")]
488    fn apply_default_privilege_update(
489        &mut self,
490        default_privilege: mz_catalog::durable::DefaultPrivilege,
491        diff: StateDiff,
492        _retractions: &mut InProgressRetractions,
493    ) {
494        match diff {
495            StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
496                .grant(default_privilege.object, default_privilege.acl_item),
497            StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
498                .revoke(&default_privilege.object, &default_privilege.acl_item),
499        }
500    }
501
502    #[instrument(level = "debug")]
503    fn apply_system_privilege_update(
504        &mut self,
505        system_privilege: MzAclItem,
506        diff: StateDiff,
507        _retractions: &mut InProgressRetractions,
508    ) {
509        match diff {
510            StateDiff::Addition => {
511                Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
512            }
513            StateDiff::Retraction => {
514                Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
515            }
516        }
517    }
518
519    #[instrument(level = "debug")]
520    fn apply_system_configuration_update(
521        &mut self,
522        system_configuration: mz_catalog::durable::SystemConfiguration,
523        diff: StateDiff,
524        _retractions: &mut InProgressRetractions,
525    ) {
526        let res = match diff {
527            StateDiff::Addition => self.insert_system_configuration(
528                &system_configuration.name,
529                VarInput::Flat(&system_configuration.value),
530            ),
531            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
532        };
533        match res {
534            Ok(_) => (),
535            // When system variables are deleted, nothing deletes them from the underlying
536            // durable catalog, which isn't great. Still, we need to be able to ignore
537            // unknown variables.
538            Err(Error {
539                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
540            }) => {
541                warn!(%name, "unknown system parameter from catalog storage");
542            }
543            Err(e) => panic!("unable to update system variable: {e:?}"),
544        }
545    }
546
547    /// Applies a durable update to one of the in-memory scoped-parameter working
548    /// copies (`scoped_system_parameters.{cluster,replica}`), keyed by object
549    /// id. Mirrors the durable `{cluster,replica}_system_configurations`
550    /// collections; the cluster copy is consumed at plan time via
551    /// [`CatalogState::cluster_scoped_optimizer_overrides`], the replica copy at
552    /// the compute controller's per-replica dyncfg push.
553    ///
554    /// Retraction is conditional on the value matching, so a value change
555    /// (retraction of the old value + addition of the new) is correct regardless
556    /// of the order the two updates are applied in. See the scoped feature flags
557    /// design.
558    ///
559    /// [`CatalogState::cluster_scoped_optimizer_overrides`]: crate::catalog::CatalogState::cluster_scoped_optimizer_overrides
560    fn apply_scoped_system_configuration_update<Id: Ord>(
561        map: &mut BTreeMap<Id, BTreeMap<String, String>>,
562        id: Id,
563        name: String,
564        value: String,
565        diff: StateDiff,
566    ) {
567        match diff {
568            StateDiff::Addition => {
569                map.entry(id).or_default().insert(name, value);
570            }
571            StateDiff::Retraction => {
572                if let Some(values) = map.get_mut(&id) {
573                    if values.get(&name) == Some(&value) {
574                        values.remove(&name);
575                        if values.is_empty() {
576                            map.remove(&id);
577                        }
578                    }
579                }
580            }
581        }
582    }
583
584    #[instrument(level = "debug")]
585    fn apply_cluster_update(
586        &mut self,
587        cluster: mz_catalog::durable::Cluster,
588        diff: StateDiff,
589        retractions: &mut InProgressRetractions,
590    ) {
591        // Soft signal for a managed cluster whose replica size isn't in
592        // the in-memory size map. The `mz_clusters` MV LEFT JOINs the size
593        // table and resolves `disk` to false when the size is missing, so
594        // this case no longer crashes a managed cluster with RF=0.
595        // A managed cluster with at least one running replica still panics
596        // via `concretize_replica_location` in `apply_cluster_replica_update`.
597        if matches!(diff, StateDiff::Addition) {
598            if let mz_catalog::durable::ClusterVariant::Managed(managed) = &cluster.config.variant {
599                if !self.cluster_replica_sizes.0.contains_key(&managed.size) {
600                    soft_panic_or_log!(
601                        "managed cluster {} ({}) references unknown replica size {:?}; \
602                         mz_clusters.disk will resolve to false",
603                        cluster.name,
604                        cluster.id,
605                        managed.size,
606                    );
607                }
608            }
609        }
610        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
611        apply_with_update(
612            &mut self.clusters_by_id,
613            cluster,
614            |cluster| cluster.id,
615            diff,
616            &mut retractions.clusters,
617        );
618    }
619
620    #[instrument(level = "debug")]
621    fn apply_network_policy_update(
622        &mut self,
623        policy: mz_catalog::durable::NetworkPolicy,
624        diff: StateDiff,
625        retractions: &mut InProgressRetractions,
626    ) {
627        apply_inverted_lookup(
628            &mut self.network_policies_by_name,
629            &policy.name,
630            policy.id,
631            diff,
632        );
633        apply_with_update(
634            &mut self.network_policies_by_id,
635            policy,
636            |policy| policy.id,
637            diff,
638            &mut retractions.network_policies,
639        );
640    }
641
642    #[instrument(level = "debug")]
643    fn apply_introspection_source_index_update(
644        &mut self,
645        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
646        diff: StateDiff,
647        retractions: &mut InProgressRetractions,
648    ) {
649        let cluster = self
650            .clusters_by_id
651            .get_mut(&introspection_source_index.cluster_id)
652            .expect("catalog out of sync");
653        let log = BUILTIN_LOG_LOOKUP
654            .get(introspection_source_index.name.as_str())
655            .expect("missing log");
656        apply_inverted_lookup(
657            &mut cluster.log_indexes,
658            &log.variant,
659            introspection_source_index.index_id,
660            diff,
661        );
662
663        match diff {
664            StateDiff::Addition => {
665                if let Some(mut entry) = retractions
666                    .introspection_source_indexes
667                    .remove(&introspection_source_index.item_id)
668                {
669                    // This should only happen during startup as a result of builtin migrations. We
670                    // create a new index item and replace the old one with it.
671                    let (index_name, index) = self.create_introspection_source_index(
672                        introspection_source_index.cluster_id,
673                        log,
674                        introspection_source_index.index_id,
675                    );
676                    assert_eq!(entry.id, introspection_source_index.item_id);
677                    assert_eq!(entry.oid, introspection_source_index.oid);
678                    assert_eq!(entry.name, index_name);
679                    entry.item = index;
680                    self.insert_entry(entry);
681                } else {
682                    self.insert_introspection_source_index(
683                        introspection_source_index.cluster_id,
684                        log,
685                        introspection_source_index.item_id,
686                        introspection_source_index.index_id,
687                        introspection_source_index.oid,
688                    );
689                }
690            }
691            StateDiff::Retraction => {
692                let entry = self.drop_item(introspection_source_index.item_id);
693                retractions
694                    .introspection_source_indexes
695                    .insert(entry.id, entry);
696            }
697        }
698    }
699
700    #[instrument(level = "debug")]
701    fn apply_cluster_replica_update(
702        &mut self,
703        cluster_replica: mz_catalog::durable::ClusterReplica,
704        diff: StateDiff,
705        _retractions: &mut InProgressRetractions,
706    ) {
707        let cluster = self
708            .clusters_by_id
709            .get(&cluster_replica.cluster_id)
710            .expect("catalog out of sync");
711
712        // Mirror the cluster-side soft signal: if a managed replica's size
713        // is no longer in the in-memory size map, skip in-memory
714        // registration rather than panicking. The mz_cluster_replicas MV
715        // resolves `disk` from mz_cluster_replica_size_internal (which
716        // retains rows for disabled sizes), so SQL still returns sensible
717        // results. We tolerate disabled sizes here (allow_disabled=true)
718        // because an existing replica must remain queryable even if the
719        // operator has since disabled its size.
720        if let mz_catalog::durable::ReplicaLocation::Managed { size, .. } =
721            &cluster_replica.config.location
722        {
723            if !self.cluster_replica_sizes.0.contains_key(size) {
724                soft_panic_or_log!(
725                    "cluster replica {}.{} ({}) references unknown replica size {:?}; \
726                     skipping in-memory registration",
727                    cluster.name,
728                    cluster_replica.name,
729                    cluster_replica.replica_id,
730                    size,
731                );
732                return;
733            }
734        }
735
736        // Pass no availability-zone override: a rebuild from the durable record
737        // keeps the AZ list the replica was provisioned under, which may differ
738        // from the cluster's current pool while a graceful reconfiguration is in
739        // flight — the cluster controller tells realized- from target-shape
740        // replicas by exactly this list.
741        let location = self
742            .concretize_replica_location(cluster_replica.config.location, &vec![], None, true)
743            .expect("catalog in unexpected state");
744        let cluster = self
745            .clusters_by_id
746            .get_mut(&cluster_replica.cluster_id)
747            .expect("catalog out of sync");
748        apply_inverted_lookup(
749            &mut cluster.replica_id_by_name_,
750            &cluster_replica.name,
751            cluster_replica.replica_id,
752            diff,
753        );
754        match diff {
755            StateDiff::Retraction => {
756                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
757                assert!(
758                    prev.is_some(),
759                    "retraction does not match existing value: {:?}",
760                    cluster_replica.replica_id
761                );
762            }
763            StateDiff::Addition => {
764                let logging = ReplicaLogging {
765                    log_logging: cluster_replica.config.logging.log_logging,
766                    interval: cluster_replica.config.logging.interval,
767                };
768                let config = ReplicaConfig {
769                    location,
770                    compute: ComputeReplicaConfig { logging },
771                };
772                let mem_cluster_replica = ClusterReplica {
773                    name: cluster_replica.name.clone(),
774                    cluster_id: cluster_replica.cluster_id,
775                    replica_id: cluster_replica.replica_id,
776                    config,
777                    owner_id: cluster_replica.owner_id,
778                };
779                let prev = cluster
780                    .replicas_by_id_
781                    .insert(cluster_replica.replica_id, mem_cluster_replica);
782                assert_eq!(
783                    prev, None,
784                    "values must be explicitly retracted before inserting a new value: {:?}",
785                    cluster_replica.replica_id
786                );
787            }
788        }
789    }
790
791    #[instrument(level = "debug")]
792    fn apply_system_object_mapping_update(
793        &mut self,
794        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
795        diff: StateDiff,
796        retractions: &mut InProgressRetractions,
797        local_expression_cache: &mut LocalExpressionCache,
798    ) {
799        let item_id = system_object_mapping.unique_identifier.catalog_id;
800        let global_id = system_object_mapping.unique_identifier.global_id;
801
802        if system_object_mapping.unique_identifier.runtime_alterable() {
803            // Runtime-alterable system objects have real entries in the items
804            // collection and so get handled through the normal `insert_item`
805            // and `drop_item` code paths.
806            return;
807        }
808
809        if let StateDiff::Retraction = diff {
810            let entry = self.drop_item(item_id);
811            retractions.system_object_mappings.insert(item_id, entry);
812            return;
813        }
814
815        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
816            // This implies that we updated the fingerprint for some builtin item. The retraction
817            // was parsed, planned, and optimized using the compiled in definition, not the
818            // definition from a previous version. So we can just stick the old entry back into the
819            // catalog.
820            self.insert_entry(entry);
821            return;
822        }
823
824        let builtin = BUILTIN_LOOKUP
825            .get(&system_object_mapping.description)
826            .expect("missing builtin")
827            .1;
828        let schema_name = builtin.schema();
829        let schema_id = self
830            .ambient_schemas_by_name
831            .get(schema_name)
832            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
833        let name = QualifiedItemName {
834            qualifiers: ItemQualifiers {
835                database_spec: ResolvedDatabaseSpecifier::Ambient,
836                schema_spec: SchemaSpecifier::Id(*schema_id),
837            },
838            item: builtin.name().into(),
839        };
840        match builtin {
841            Builtin::Log(log) => {
842                let mut acl_items = vec![rbac::owner_privilege(
843                    mz_sql::catalog::ObjectType::Source,
844                    MZ_SYSTEM_ROLE_ID,
845                )];
846                acl_items.extend_from_slice(&log.access);
847                self.insert_item(
848                    item_id,
849                    log.oid,
850                    name.clone(),
851                    CatalogItem::Log(Log {
852                        variant: log.variant,
853                        global_id,
854                    }),
855                    MZ_SYSTEM_ROLE_ID,
856                    PrivilegeMap::from_mz_acl_items(acl_items),
857                );
858            }
859
860            Builtin::Table(table) => {
861                let mut acl_items = vec![rbac::owner_privilege(
862                    mz_sql::catalog::ObjectType::Table,
863                    MZ_SYSTEM_ROLE_ID,
864                )];
865                acl_items.extend_from_slice(&table.access);
866
867                self.insert_item(
868                    item_id,
869                    table.oid,
870                    name.clone(),
871                    CatalogItem::Table(Table {
872                        create_sql: None,
873                        desc: VersionedRelationDesc::new(table.desc.clone()),
874                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
875                        conn_id: None,
876                        resolved_ids: ResolvedIds::empty(),
877                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
878                            || {
879                                self.system_config()
880                                    .metrics_retention()
881                                    .try_into()
882                                    .expect("invalid metrics retention")
883                            },
884                        ),
885                        is_retained_metrics_object: table.is_retained_metrics_object,
886                        data_source: TableDataSource::TableWrites {
887                            defaults: vec![Expr::null(); table.desc.arity()],
888                        },
889                    }),
890                    MZ_SYSTEM_ROLE_ID,
891                    PrivilegeMap::from_mz_acl_items(acl_items),
892                );
893            }
894            Builtin::Index(index) => {
895                let custom_logical_compaction_window =
896                    index.is_retained_metrics_object.then(|| {
897                        self.system_config()
898                            .metrics_retention()
899                            .try_into()
900                            .expect("invalid metrics retention")
901                    });
902                // Indexes can't be versioned.
903                let versions = BTreeMap::new();
904
905                let item = self
906                    .parse_item(
907                        global_id,
908                        &index.create_sql(),
909                        &versions,
910                        None,
911                        index.is_retained_metrics_object,
912                        custom_logical_compaction_window,
913                        local_expression_cache,
914                        None,
915                    )
916                    .unwrap_or_else(|e| {
917                        panic!(
918                            "internal error: failed to load bootstrap index:\n\
919                                    {}\n\
920                                    error:\n\
921                                    {:?}\n\n\
922                                    make sure that the schema name is specified in the builtin index's create sql statement.",
923                            index.name, e
924                        )
925                    });
926                let CatalogItem::Index(_) = item else {
927                    panic!(
928                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
929                        index.name
930                    );
931                };
932
933                self.insert_item(
934                    item_id,
935                    index.oid,
936                    name,
937                    item,
938                    MZ_SYSTEM_ROLE_ID,
939                    PrivilegeMap::default(),
940                );
941            }
942            Builtin::View(_) => {
943                // parse_views is responsible for inserting all builtin views.
944                unreachable!("views added elsewhere");
945            }
946
947            // Note: Element types must be loaded before array types.
948            Builtin::Type(typ) => {
949                let typ = self.resolve_builtin_type_references(typ);
950                if let CatalogType::Array { element_reference } = typ.details.typ {
951                    let entry = self.get_entry_mut(&element_reference);
952                    let item_type = match &mut entry.item {
953                        CatalogItem::Type(item_type) => item_type,
954                        _ => unreachable!("types can only reference other types"),
955                    };
956                    item_type.details.array_id = Some(item_id);
957                }
958
959                let schema_id = self.resolve_system_schema(typ.schema);
960
961                self.insert_item(
962                    item_id,
963                    typ.oid,
964                    QualifiedItemName {
965                        qualifiers: ItemQualifiers {
966                            database_spec: ResolvedDatabaseSpecifier::Ambient,
967                            schema_spec: SchemaSpecifier::Id(schema_id),
968                        },
969                        item: typ.name.to_owned(),
970                    },
971                    CatalogItem::Type(Type {
972                        create_sql: None,
973                        global_id,
974                        details: typ.details.clone(),
975                        resolved_ids: ResolvedIds::empty(),
976                    }),
977                    MZ_SYSTEM_ROLE_ID,
978                    PrivilegeMap::from_mz_acl_items(vec![
979                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
980                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
981                    ]),
982                );
983            }
984
985            Builtin::Func(func) => {
986                // This OID is never used. `func` has a `Vec` of implementations and
987                // each implementation has its own OID. Those are the OIDs that are
988                // actually used by the system.
989                let oid = INVALID_OID;
990                self.insert_item(
991                    item_id,
992                    oid,
993                    name.clone(),
994                    CatalogItem::Func(Func {
995                        inner: func.inner,
996                        global_id,
997                    }),
998                    MZ_SYSTEM_ROLE_ID,
999                    PrivilegeMap::default(),
1000                );
1001            }
1002
1003            Builtin::Source(coll) => {
1004                let mut acl_items = vec![rbac::owner_privilege(
1005                    mz_sql::catalog::ObjectType::Source,
1006                    MZ_SYSTEM_ROLE_ID,
1007                )];
1008                acl_items.extend_from_slice(&coll.access);
1009
1010                self.insert_item(
1011                    item_id,
1012                    coll.oid,
1013                    name.clone(),
1014                    CatalogItem::Source(Source {
1015                        create_sql: None,
1016                        data_source: coll.data_source.clone(),
1017                        desc: coll.desc.clone(),
1018                        global_id,
1019                        timeline: Timeline::EpochMilliseconds,
1020                        resolved_ids: ResolvedIds::empty(),
1021                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
1022                            || {
1023                                self.system_config()
1024                                    .metrics_retention()
1025                                    .try_into()
1026                                    .expect("invalid metrics retention")
1027                            },
1028                        ),
1029                        is_retained_metrics_object: coll.is_retained_metrics_object,
1030                    }),
1031                    MZ_SYSTEM_ROLE_ID,
1032                    PrivilegeMap::from_mz_acl_items(acl_items),
1033                );
1034            }
1035            Builtin::MaterializedView(mv) => {
1036                let mut acl_items = vec![rbac::owner_privilege(
1037                    mz_sql::catalog::ObjectType::MaterializedView,
1038                    MZ_SYSTEM_ROLE_ID,
1039                )];
1040                acl_items.extend_from_slice(&mv.access);
1041
1042                let custom_logical_compaction_window = mv.is_retained_metrics_object.then(|| {
1043                    self.system_config()
1044                        .metrics_retention()
1045                        .try_into()
1046                        .expect("invalid metrics retention")
1047                });
1048
1049                // Builtin materialized views can't be versioned.
1050                let versions = BTreeMap::new();
1051
1052                let mut item = self
1053                    .parse_item(
1054                        global_id,
1055                        &mv.create_sql(),
1056                        &versions,
1057                        None,
1058                        mv.is_retained_metrics_object,
1059                        custom_logical_compaction_window,
1060                        local_expression_cache,
1061                        None,
1062                    )
1063                    .unwrap_or_else(|e| {
1064                        panic!(
1065                            "internal error: failed to load bootstrap materialized view:\n\
1066                             {}\n\
1067                             error:\n\
1068                             {e:?}\n\n\
1069                             make sure that the schema name is specified in the builtin \
1070                             materialized view's create sql statement.",
1071                            mv.name,
1072                        )
1073                    });
1074                let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
1075                    panic!(
1076                        "internal error: builtin materialized view {}'s SQL does not begin \
1077                         with \"CREATE MATERIALIZED VIEW\".",
1078                        mv.name,
1079                    );
1080                };
1081
1082                // The optimizer can only infer keys from MV definitions, but cannot infer
1083                // uniqueness present in the input data. Extend with the keys declared in the
1084                // builtin definition, to allow supplying additional key knowledge.
1085                let mut desc = catalog_mv.desc.latest();
1086                for key in &mv.desc.typ().keys {
1087                    desc = desc.with_key(key.clone());
1088                }
1089                catalog_mv.desc = VersionedRelationDesc::new(desc);
1090
1091                self.insert_item(
1092                    item_id,
1093                    mv.oid,
1094                    name,
1095                    item,
1096                    MZ_SYSTEM_ROLE_ID,
1097                    PrivilegeMap::from_mz_acl_items(acl_items),
1098                );
1099            }
1100            Builtin::Connection(connection) => {
1101                // Connections can't be versioned.
1102                let versions = BTreeMap::new();
1103                let mut item = self
1104                    .parse_item(
1105                        global_id,
1106                        connection.sql,
1107                        &versions,
1108                        None,
1109                        false,
1110                        None,
1111                        local_expression_cache,
1112                        None,
1113                    )
1114                    .unwrap_or_else(|e| {
1115                        panic!(
1116                            "internal error: failed to load bootstrap connection:\n\
1117                                    {}\n\
1118                                    error:\n\
1119                                    {:?}\n\n\
1120                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
1121                            connection.name, e
1122                        )
1123                    });
1124                let CatalogItem::Connection(_) = &mut item else {
1125                    panic!(
1126                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1127                        connection.name
1128                    );
1129                };
1130
1131                let mut acl_items = vec![rbac::owner_privilege(
1132                    mz_sql::catalog::ObjectType::Connection,
1133                    connection.owner_id.clone(),
1134                )];
1135                acl_items.extend_from_slice(connection.access);
1136
1137                self.insert_item(
1138                    item_id,
1139                    connection.oid,
1140                    name.clone(),
1141                    item,
1142                    connection.owner_id.clone(),
1143                    PrivilegeMap::from_mz_acl_items(acl_items),
1144                );
1145            }
1146        }
1147    }
1148
1149    #[instrument(level = "debug")]
1150    fn apply_temporary_item_update(
1151        &mut self,
1152        temporary_item: TemporaryItem,
1153        diff: StateDiff,
1154        retractions: &mut InProgressRetractions,
1155        local_expression_cache: &mut LocalExpressionCache,
1156    ) {
1157        match diff {
1158            StateDiff::Addition => {
1159                let TemporaryItem {
1160                    id,
1161                    oid,
1162                    global_id,
1163                    schema_id,
1164                    name,
1165                    conn_id,
1166                    create_sql,
1167                    owner_id,
1168                    privileges,
1169                    extra_versions,
1170                } = temporary_item;
1171                // Lazily create the temporary schema if it doesn't exist yet.
1172                // We need the conn_id to create the schema, and it should always be Some for temp items.
1173                let temp_conn_id = conn_id
1174                    .as_ref()
1175                    .expect("temporary items must have a connection id");
1176                if !self.temporary_schemas.contains_key(temp_conn_id) {
1177                    self.create_temporary_schema(temp_conn_id, owner_id)
1178                        .expect("failed to create temporary schema");
1179                }
1180                let schema = self.find_temp_schema(&schema_id);
1181                let name = QualifiedItemName {
1182                    qualifiers: ItemQualifiers {
1183                        database_spec: schema.database().clone(),
1184                        schema_spec: schema.id().clone(),
1185                    },
1186                    item: name.clone(),
1187                };
1188
1189                let entry = match retractions.temp_items.remove(&id) {
1190                    Some(mut retraction) => {
1191                        assert_eq!(retraction.id, id);
1192
1193                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1194                        // item. This is a performance optimization and not needed for correctness.
1195                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1196                        // is still the same as the trait.
1197                        if retraction.create_sql() != create_sql {
1198                            let mut catalog_item = self
1199                                .deserialize_item(
1200                                    global_id,
1201                                    &create_sql,
1202                                    &extra_versions,
1203                                    local_expression_cache,
1204                                    Some(retraction.item),
1205                                )
1206                                .unwrap_or_else(|e| {
1207                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1208                                });
1209                            // Have to patch up the item because parsing doesn't
1210                            // take into account temporary schemas/conn_id.
1211                            // NOTE(aljoscha): I don't like how we're patching
1212                            // this in here, but it's but one of the ways in
1213                            // which temporary items are a bit weird. So, here
1214                            // we are ...
1215                            catalog_item.set_conn_id(conn_id);
1216                            retraction.item = catalog_item;
1217                        }
1218
1219                        retraction.id = id;
1220                        retraction.oid = oid;
1221                        retraction.name = name;
1222                        retraction.owner_id = owner_id;
1223                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1224                        retraction
1225                    }
1226                    None => {
1227                        let mut catalog_item = self
1228                            .deserialize_item(
1229                                global_id,
1230                                &create_sql,
1231                                &extra_versions,
1232                                local_expression_cache,
1233                                None,
1234                            )
1235                            .unwrap_or_else(|e| {
1236                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1237                            });
1238
1239                        // Have to patch up the item because parsing doesn't
1240                        // take into account temporary schemas/conn_id.
1241                        // NOTE(aljoscha): I don't like how we're patching this
1242                        // in here, but it's but one of the ways in which
1243                        // temporary items are a bit weird. So, here we are ...
1244                        catalog_item.set_conn_id(conn_id);
1245
1246                        CatalogEntry {
1247                            item: catalog_item,
1248                            referenced_by: Vec::new(),
1249                            used_by: Vec::new(),
1250                            id,
1251                            oid,
1252                            name,
1253                            owner_id,
1254                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1255                        }
1256                    }
1257                };
1258                self.insert_entry(entry);
1259            }
1260            StateDiff::Retraction => {
1261                let entry = self.drop_item(temporary_item.id);
1262                retractions.temp_items.insert(temporary_item.id, entry);
1263            }
1264        }
1265    }
1266
1267    #[instrument(level = "debug")]
1268    fn apply_item_update(
1269        &mut self,
1270        item: mz_catalog::durable::Item,
1271        diff: StateDiff,
1272        retractions: &mut InProgressRetractions,
1273        local_expression_cache: &mut LocalExpressionCache,
1274    ) -> Result<(), CatalogError> {
1275        match diff {
1276            StateDiff::Addition => {
1277                let key = item.key();
1278                let mz_catalog::durable::Item {
1279                    id,
1280                    oid,
1281                    global_id,
1282                    schema_id,
1283                    name,
1284                    create_sql,
1285                    owner_id,
1286                    privileges,
1287                    extra_versions,
1288                } = item;
1289                let schema = self.find_non_temp_schema(&schema_id);
1290                let name = QualifiedItemName {
1291                    qualifiers: ItemQualifiers {
1292                        database_spec: schema.database().clone(),
1293                        schema_spec: schema.id().clone(),
1294                    },
1295                    item: name.clone(),
1296                };
1297                let entry = match retractions.items.remove(&key) {
1298                    Some(retraction) => {
1299                        assert_eq!(retraction.id, item.id);
1300
1301                        let item = self
1302                            .deserialize_item(
1303                                global_id,
1304                                &create_sql,
1305                                &extra_versions,
1306                                local_expression_cache,
1307                                Some(retraction.item),
1308                            )
1309                            .unwrap_or_else(|e| {
1310                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1311                            });
1312
1313                        CatalogEntry {
1314                            item,
1315                            id,
1316                            oid,
1317                            name,
1318                            owner_id,
1319                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1320                            referenced_by: retraction.referenced_by,
1321                            used_by: retraction.used_by,
1322                        }
1323                    }
1324                    None => {
1325                        let catalog_item = self
1326                            .deserialize_item(
1327                                global_id,
1328                                &create_sql,
1329                                &extra_versions,
1330                                local_expression_cache,
1331                                None,
1332                            )
1333                            .unwrap_or_else(|e| {
1334                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1335                            });
1336                        CatalogEntry {
1337                            item: catalog_item,
1338                            referenced_by: Vec::new(),
1339                            used_by: Vec::new(),
1340                            id,
1341                            oid,
1342                            name,
1343                            owner_id,
1344                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1345                        }
1346                    }
1347                };
1348
1349                self.insert_entry(entry);
1350            }
1351            StateDiff::Retraction => {
1352                let entry = self.drop_item(item.id);
1353                let key = item.into_key_value().0;
1354                retractions.items.insert(key, entry);
1355            }
1356        }
1357        Ok(())
1358    }
1359
1360    #[instrument(level = "debug")]
1361    fn apply_comment_update(
1362        &mut self,
1363        comment: mz_catalog::durable::Comment,
1364        diff: StateDiff,
1365        _retractions: &mut InProgressRetractions,
1366    ) {
1367        match diff {
1368            StateDiff::Addition => {
1369                let prev = Arc::make_mut(&mut self.comments).update_comment(
1370                    comment.object_id,
1371                    comment.sub_component,
1372                    Some(comment.comment),
1373                );
1374                assert_eq!(
1375                    prev, None,
1376                    "values must be explicitly retracted before inserting a new value"
1377                );
1378            }
1379            StateDiff::Retraction => {
1380                let prev = Arc::make_mut(&mut self.comments).update_comment(
1381                    comment.object_id,
1382                    comment.sub_component,
1383                    None,
1384                );
1385                assert_eq!(
1386                    prev,
1387                    Some(comment.comment),
1388                    "retraction does not match existing value: ({:?}, {:?})",
1389                    comment.object_id,
1390                    comment.sub_component,
1391                );
1392            }
1393        }
1394    }
1395
1396    #[instrument(level = "debug")]
1397    fn apply_source_references_update(
1398        &mut self,
1399        source_references: mz_catalog::durable::SourceReferences,
1400        diff: StateDiff,
1401        _retractions: &mut InProgressRetractions,
1402    ) {
1403        match diff {
1404            StateDiff::Addition => {
1405                let prev = self
1406                    .source_references
1407                    .insert(source_references.source_id, source_references.into());
1408                assert!(
1409                    prev.is_none(),
1410                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1411                );
1412            }
1413            StateDiff::Retraction => {
1414                let prev = self.source_references.remove(&source_references.source_id);
1415                assert!(
1416                    prev.is_some(),
1417                    "retraction for a non-existent existing value: {source_references:?}"
1418                );
1419            }
1420        }
1421    }
1422
1423    #[instrument(level = "debug")]
1424    fn apply_storage_collection_metadata_update(
1425        &mut self,
1426        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1427        diff: StateDiff,
1428        _retractions: &mut InProgressRetractions,
1429    ) {
1430        apply_inverted_lookup(
1431            &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1432            &storage_collection_metadata.id,
1433            storage_collection_metadata.shard,
1434            diff,
1435        );
1436    }
1437
1438    #[instrument(level = "debug")]
1439    fn apply_unfinalized_shard_update(
1440        &mut self,
1441        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1442        diff: StateDiff,
1443        _retractions: &mut InProgressRetractions,
1444    ) {
1445        match diff {
1446            StateDiff::Addition => {
1447                let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1448                    .unfinalized_shards
1449                    .insert(unfinalized_shard.shard);
1450                assert!(
1451                    newly_inserted,
1452                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1453                );
1454            }
1455            StateDiff::Retraction => {
1456                let removed = Arc::make_mut(&mut self.storage_metadata)
1457                    .unfinalized_shards
1458                    .remove(&unfinalized_shard.shard);
1459                assert!(
1460                    removed,
1461                    "retraction does not match existing value: {unfinalized_shard:?}"
1462                );
1463            }
1464        }
1465    }
1466
1467    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1468    /// durable catalog.
1469    #[instrument]
1470    pub(crate) fn generate_builtin_table_updates(
1471        &self,
1472        updates: Vec<StateUpdate>,
1473    ) -> Vec<BuiltinTableUpdate> {
1474        let mut builtin_table_updates = Vec::new();
1475        for StateUpdate { kind, ts: _, diff } in updates {
1476            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1477            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1478            builtin_table_updates.extend(builtin_table_update);
1479        }
1480        builtin_table_updates
1481    }
1482
1483    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1484    /// durable catalog.
1485    #[instrument(level = "debug")]
1486    pub(crate) fn generate_builtin_table_update(
1487        &self,
1488        kind: StateUpdateKind,
1489        diff: StateDiff,
1490    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1491        let diff = diff.into();
1492        match kind {
1493            // mz_roles and mz_role_parameters are MaterializedViews backed by
1494            // mz_internal.mz_catalog_raw, so role rows do not produce builtin
1495            // table updates here.
1496            StateUpdateKind::Role(_) => Vec::new(),
1497            StateUpdateKind::RoleAuth(role_auth) => {
1498                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1499            }
1500            // mz_default_privileges and mz_system_privileges are MaterializedViews
1501            // backed by mz_internal.mz_catalog_raw, so privilege rows do not
1502            // produce builtin table updates here.
1503            StateUpdateKind::DefaultPrivilege(_) => Vec::new(),
1504            StateUpdateKind::SystemPrivilege(_) => Vec::new(),
1505            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1506            // mz_internal.mz_{cluster,replica}_system_parameters are
1507            // MaterializedViews backed by mz_internal.mz_catalog_raw, so the
1508            // durable scoped-configuration rows do not produce builtin table
1509            // updates here. (The in-memory working copy used for resolution is
1510            // maintained separately in `apply_*_system_configuration_update`.)
1511            StateUpdateKind::ClusterSystemConfiguration(_) => Vec::new(),
1512            StateUpdateKind::ReplicaSystemConfiguration(_) => Vec::new(),
1513            // mz_clusters and mz_cluster_schedules are MaterializedViews backed
1514            // by mz_internal.mz_catalog_raw, so cluster rows do not produce
1515            // builtin table updates here.
1516            StateUpdateKind::Cluster(_) => Vec::new(),
1517            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1518                self.pack_item_update(introspection_source_index.item_id, diff)
1519            }
1520            // mz_cluster_replicas is a MaterializedView backed by
1521            // mz_internal.mz_catalog_raw.
1522            StateUpdateKind::ClusterReplica(_) => Vec::new(),
1523            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1524                // Runtime-alterable system objects have real entries in the
1525                // items collection and so get handled through the normal
1526                // `StateUpdateKind::Item`.`
1527                if !system_object_mapping.unique_identifier.runtime_alterable() {
1528                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1529                } else {
1530                    vec![]
1531                }
1532            }
1533            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1534            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1535            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1536                comment.object_id,
1537                comment.sub_component,
1538                &comment.comment,
1539                diff,
1540            )],
1541            StateUpdateKind::SourceReferences(source_references) => {
1542                self.pack_source_references_update(&source_references, diff)
1543            }
1544            StateUpdateKind::AuditLog(audit_log) => {
1545                vec![
1546                    self.pack_audit_log_update(&audit_log.event, diff)
1547                        .expect("could not pack audit log update"),
1548                ]
1549            }
1550            StateUpdateKind::Database(_)
1551            | StateUpdateKind::Schema(_)
1552            | StateUpdateKind::NetworkPolicy(_)
1553            | StateUpdateKind::StorageCollectionMetadata(_)
1554            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1555        }
1556    }
1557
1558    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1559        self.entry_by_id
1560            .get_mut(id)
1561            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1562    }
1563
1564    /// Set the optimized plan for the item identified by `id`.
1565    ///
1566    /// # Panics
1567    /// If the item is not an `Index` or `MaterializedView`.
1568    pub(super) fn set_optimized_plan(
1569        &mut self,
1570        id: GlobalId,
1571        plan: DataflowDescription<mz_expr::OptimizedMirRelationExpr>,
1572    ) {
1573        let item_id = self.entry_by_global_id[&id];
1574        let entry = self.get_entry_mut(&item_id);
1575        match entry.item_mut() {
1576            CatalogItem::Index(idx) => idx.optimized_plan = Some(Arc::new(plan)),
1577            CatalogItem::MaterializedView(mv) => mv.optimized_plan = Some(Arc::new(plan)),
1578            other => panic!("set_optimized_plan called on {} ({:?})", id, other.typ()),
1579        }
1580    }
1581
1582    /// Set the physical plan for the item identified by `id`.
1583    ///
1584    /// # Panics
1585    /// If the item is not an `Index` or `MaterializedView`.
1586    pub(super) fn set_physical_plan(
1587        &mut self,
1588        id: GlobalId,
1589        plan: DataflowDescription<mz_compute_types::plan::Plan>,
1590    ) {
1591        let item_id = self.entry_by_global_id[&id];
1592        let entry = self.get_entry_mut(&item_id);
1593        match entry.item_mut() {
1594            CatalogItem::Index(idx) => idx.physical_plan = Some(Arc::new(plan)),
1595            CatalogItem::MaterializedView(mv) => mv.physical_plan = Some(Arc::new(plan)),
1596            other => panic!("set_physical_plan called on {} ({:?})", id, other.typ()),
1597        }
1598    }
1599
1600    /// Set the `DataflowMetainfo` for the item identified by `id`.
1601    ///
1602    /// # Panics
1603    /// If the item is not an `Index` or `MaterializedView`.
1604    pub(super) fn set_dataflow_metainfo(
1605        &mut self,
1606        id: GlobalId,
1607        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
1608    ) {
1609        // Add entries to the `notices_by_dep_id` lookup map.
1610        for notice in metainfo.optimizer_notices.iter() {
1611            for dep_id in notice.dependencies.iter() {
1612                self.notices_by_dep_id
1613                    .entry(*dep_id)
1614                    .or_default()
1615                    .push(Arc::clone(notice));
1616            }
1617            if let Some(item_id) = notice.item_id {
1618                soft_assert_eq_or_log!(
1619                    item_id,
1620                    id,
1621                    "notice.item_id should match the id for whom we are saving the notice"
1622                );
1623            }
1624        }
1625        // Set the metainfo on the catalog object.
1626        let item_id = self.entry_by_global_id[&id];
1627        let entry = self.get_entry_mut(&item_id);
1628        match entry.item_mut() {
1629            CatalogItem::Index(idx) => idx.dataflow_metainfo = Some(metainfo),
1630            CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo = Some(metainfo),
1631            other => panic!("set_dataflow_metainfo called on {} ({:?})", id, other.typ()),
1632        }
1633    }
1634
1635    /// Clean up optimizer notices for the given dropped catalog entries.
1636    ///
1637    /// This extracts notices directly from the owned entries (which
1638    /// have already been removed from the catalog maps), cleans up
1639    /// the `notices_by_dep_id` reverse index, and removes notices
1640    /// from other (still-live) catalog objects that depended on the
1641    /// dropped items.
1642    ///
1643    /// Returns the set of all dropped notices for builtin table
1644    /// retraction.
1645    #[mz_ore::instrument(level = "trace")]
1646    pub(super) fn drop_optimizer_notices(
1647        &mut self,
1648        dropped_entries: Vec<CatalogEntry>,
1649    ) -> BTreeSet<Arc<OptimizerNotice>> {
1650        let mut dropped_notices = BTreeSet::new();
1651        let mut drop_ids = BTreeSet::new();
1652
1653        // Extract notices directly from the owned dropped entries.
1654        for mut entry in dropped_entries {
1655            drop_ids.extend(entry.global_ids());
1656            let metainfo = match entry.item_mut() {
1657                CatalogItem::Index(idx) => idx.dataflow_metainfo.take(),
1658                CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.take(),
1659                _ => None,
1660            };
1661            if let Some(mut metainfo) = metainfo {
1662                soft_assert_or_log!(
1663                    metainfo.optimizer_notices.iter().all_unique(),
1664                    "should have been pushed there by \
1665                     `push_optimizer_notice_dedup`"
1666                );
1667                for n in metainfo.optimizer_notices.drain(..) {
1668                    // Clean up notices_by_dep_id for this notice's
1669                    // dependencies.
1670                    for dep_id in n.dependencies.iter() {
1671                        if let Some(notices) = self.notices_by_dep_id.get_mut(dep_id) {
1672                            notices.retain(|x| &n != x);
1673                            if notices.is_empty() {
1674                                self.notices_by_dep_id.remove(dep_id);
1675                            }
1676                        }
1677                    }
1678                    dropped_notices.insert(n);
1679                }
1680            }
1681        }
1682
1683        // Remove notices_by_dep_id entries keyed by the dropped IDs.
1684        // These are notices on OTHER items that depend on a dropped
1685        // item. We need to remove them from those items' metainfo.
1686        for id in &drop_ids {
1687            if let Some(notices) = self.notices_by_dep_id.remove(id) {
1688                for n in notices.into_iter() {
1689                    // Remove the notice from the catalog object it
1690                    // lives on (if that object still exists — it
1691                    // may have been dropped too, in which case the
1692                    // notice was already collected above).
1693                    if let Some(item_id) = n.item_id.as_ref() {
1694                        if let Some(entry) = self.try_get_entry_by_global_id(item_id) {
1695                            let catalog_item_id = entry.id();
1696                            let entry = self.get_entry_mut(&catalog_item_id);
1697                            let item = entry.item_mut();
1698                            match item {
1699                                CatalogItem::Index(idx) => {
1700                                    if let Some(ref mut m) = idx.dataflow_metainfo {
1701                                        m.optimizer_notices.retain(|x| &n != x);
1702                                    }
1703                                }
1704                                CatalogItem::MaterializedView(mv) => {
1705                                    if let Some(ref mut m) = mv.dataflow_metainfo {
1706                                        m.optimizer_notices.retain(|x| &n != x);
1707                                    }
1708                                }
1709                                _ => {}
1710                            }
1711                        }
1712                    }
1713                    dropped_notices.insert(n);
1714                }
1715            }
1716        }
1717
1718        // Clean up notices_by_dep_id entries for dependency IDs
1719        // that are NOT being dropped but had dropped notices.
1720        let todo_dep_ids: BTreeSet<GlobalId> = dropped_notices
1721            .iter()
1722            .flat_map(|n| n.dependencies.iter())
1723            .filter(|dep_id| !drop_ids.contains(dep_id))
1724            .copied()
1725            .collect();
1726        for id in todo_dep_ids {
1727            if let Some(notices) = self.notices_by_dep_id.get_mut(&id) {
1728                notices.retain(|n| !dropped_notices.contains(n));
1729                if notices.is_empty() {
1730                    self.notices_by_dep_id.remove(&id);
1731                }
1732            }
1733        }
1734
1735        dropped_notices
1736    }
1737
1738    fn get_schema_mut(
1739        &mut self,
1740        database_spec: &ResolvedDatabaseSpecifier,
1741        schema_spec: &SchemaSpecifier,
1742        conn_id: &ConnectionId,
1743    ) -> &mut Schema {
1744        // Keep in sync with `get_schemas`
1745        match (database_spec, schema_spec) {
1746            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1747                .temporary_schemas
1748                .get_mut(conn_id)
1749                .expect("catalog out of sync"),
1750            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1751                .ambient_schemas_by_id
1752                .get_mut(id)
1753                .expect("catalog out of sync"),
1754            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1755                .database_by_id
1756                .get_mut(database_id)
1757                .expect("catalog out of sync")
1758                .schemas_by_id
1759                .get_mut(schema_id)
1760                .expect("catalog out of sync"),
1761            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1762                unreachable!("temporary schemas are in the ambient database")
1763            }
1764        }
1765    }
1766
1767    /// Install builtin views to the catalog. This is its own function so that views can be
1768    /// optimized in parallel.
1769    ///
1770    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1771    /// problems by sniffing out specific errors and then retrying once those dependencies are
1772    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1773    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1774    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1775    /// in awaiting with nothing retriggering the attempt.
1776    #[instrument(name = "catalog::parse_views")]
1777    async fn parse_builtin_views(
1778        state: &mut CatalogState,
1779        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1780        retractions: &mut InProgressRetractions,
1781        local_expression_cache: &mut LocalExpressionCache,
1782    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1783        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1784        let (updates, additions): (Vec<_>, Vec<_>) =
1785            builtin_views
1786                .into_iter()
1787                .partition_map(|(view, item_id, gid)| {
1788                    match retractions.system_object_mappings.remove(&item_id) {
1789                        Some(entry) => Either::Left(entry),
1790                        None => Either::Right((view, item_id, gid)),
1791                    }
1792                });
1793
1794        for entry in updates {
1795            // This implies that we updated the fingerprint for some builtin view. The retraction
1796            // was parsed, planned, and optimized using the compiled in definition, not the
1797            // definition from a previous version. So we can just stick the old entry back into the
1798            // catalog.
1799            let item_id = entry.id();
1800            state.insert_entry(entry);
1801            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1802        }
1803
1804        let mut handles = Vec::new();
1805        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1806            BTreeMap::new();
1807        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1808        // Some errors are due to the implementation of casts or SQL functions that depend on some
1809        // view. Instead of figuring out the exact view dependency, delay these until the end.
1810        let mut awaiting_all = Vec::new();
1811        // Completed views, needed to avoid race conditions.
1812        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1813        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1814
1815        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1816        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1817            .into_iter()
1818            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1819            .collect();
1820        let item_ids: Vec<_> = views.keys().copied().collect();
1821
1822        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1823        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1824            if handles.is_empty() && ready.is_empty() {
1825                // Enqueue the views that were waiting for all the others.
1826                ready.extend(awaiting_all.drain(..));
1827            }
1828
1829            // Spawn tasks for all ready views.
1830            if !ready.is_empty() {
1831                let spawn_state = Arc::new(state.clone());
1832                while let Some(id) = ready.pop_front() {
1833                    let (view, global_id) = views.get(&id).expect("must exist");
1834                    let global_id = *global_id;
1835                    let create_sql = view.create_sql();
1836                    // Views can't be versioned.
1837                    let versions = BTreeMap::new();
1838
1839                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1840                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1841                    let task_state = Arc::clone(&spawn_state);
1842                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1843                    let handle = mz_ore::task::spawn_blocking(
1844                        || "parse view",
1845                        move || {
1846                            span.in_scope(|| {
1847                                let res = task_state.parse_item_inner(
1848                                    global_id,
1849                                    &create_sql,
1850                                    &versions,
1851                                    None,
1852                                    false,
1853                                    None,
1854                                    cached_expr,
1855                                    None,
1856                                );
1857                                (id, global_id, res)
1858                            })
1859                        },
1860                    );
1861                    handles.push(handle);
1862                }
1863            }
1864
1865            // Wait for a view to be ready.
1866            let (selected, _idx, remaining) = future::select_all(handles).await;
1867            handles = remaining;
1868            let (id, global_id, res) = selected;
1869            let mut insert_cached_expr = |cached_expr| {
1870                if let Some(cached_expr) = cached_expr {
1871                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1872                }
1873            };
1874            match res {
1875                Ok((item, uncached_expr)) => {
1876                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1877                        local_expression_cache.insert_uncached_expression(
1878                            global_id,
1879                            uncached_expr,
1880                            optimizer_features,
1881                        );
1882                    }
1883                    // Add item to catalog.
1884                    let (view, _gid) = views.remove(&id).expect("must exist");
1885                    let schema_id = state
1886                        .ambient_schemas_by_name
1887                        .get(view.schema)
1888                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1889                    let qname = QualifiedItemName {
1890                        qualifiers: ItemQualifiers {
1891                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1892                            schema_spec: SchemaSpecifier::Id(*schema_id),
1893                        },
1894                        item: view.name.into(),
1895                    };
1896                    let mut acl_items = vec![rbac::owner_privilege(
1897                        mz_sql::catalog::ObjectType::View,
1898                        MZ_SYSTEM_ROLE_ID,
1899                    )];
1900                    acl_items.extend_from_slice(&view.access);
1901
1902                    state.insert_item(
1903                        id,
1904                        view.oid,
1905                        qname,
1906                        item,
1907                        MZ_SYSTEM_ROLE_ID,
1908                        PrivilegeMap::from_mz_acl_items(acl_items),
1909                    );
1910
1911                    // Enqueue any items waiting on this dependency.
1912                    let mut resolved_dependent_items = Vec::new();
1913                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1914                        resolved_dependent_items.extend(dependent_items);
1915                    }
1916                    let entry = state.get_entry(&id);
1917                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1918                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1919                        resolved_dependent_items.extend(dependent_items);
1920                    }
1921                    ready.extend(resolved_dependent_items);
1922
1923                    completed_ids.insert(id);
1924                    completed_names.insert(full_name);
1925                }
1926                // If we were missing a dependency, wait for it to be added.
1927                Err((
1928                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1929                    cached_expr,
1930                )) => {
1931                    insert_cached_expr(cached_expr);
1932                    if completed_ids.contains(&missing_dep) {
1933                        ready.push_back(id);
1934                    } else {
1935                        awaiting_id_dependencies
1936                            .entry(missing_dep)
1937                            .or_default()
1938                            .push(id);
1939                    }
1940                }
1941                // If we were missing a dependency, wait for it to be added.
1942                Err((
1943                    AdapterError::PlanError(plan::PlanError::Catalog(
1944                        SqlCatalogError::UnknownItem(missing_dep),
1945                    )),
1946                    cached_expr,
1947                )) => {
1948                    insert_cached_expr(cached_expr);
1949                    match CatalogItemId::from_str(&missing_dep) {
1950                        Ok(missing_dep) => {
1951                            if completed_ids.contains(&missing_dep) {
1952                                ready.push_back(id);
1953                            } else {
1954                                awaiting_id_dependencies
1955                                    .entry(missing_dep)
1956                                    .or_default()
1957                                    .push(id);
1958                            }
1959                        }
1960                        Err(_) => {
1961                            if completed_names.contains(&missing_dep) {
1962                                ready.push_back(id);
1963                            } else {
1964                                awaiting_name_dependencies
1965                                    .entry(missing_dep)
1966                                    .or_default()
1967                                    .push(id);
1968                            }
1969                        }
1970                    }
1971                }
1972                Err((
1973                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1974                    cached_expr,
1975                )) => {
1976                    insert_cached_expr(cached_expr);
1977                    awaiting_all.push(id);
1978                }
1979                Err((e, _)) => {
1980                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1981                    panic!(
1982                        "internal error: failed to load bootstrap view:\n\
1983                            {name}\n\
1984                            error:\n\
1985                            {e:?}\n\n\
1986                            Make sure that the schema name is specified in the builtin view's create sql statement.
1987                            ",
1988                        name = bad_view.name,
1989                    )
1990                }
1991            }
1992        }
1993
1994        assert!(awaiting_id_dependencies.is_empty());
1995        assert!(
1996            awaiting_name_dependencies.is_empty(),
1997            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1998        );
1999        assert!(awaiting_all.is_empty());
2000        assert!(views.is_empty());
2001
2002        // Generate a builtin table update for all the new views.
2003        builtin_table_updates.extend(
2004            item_ids
2005                .into_iter()
2006                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
2007        );
2008
2009        builtin_table_updates
2010    }
2011
2012    /// Associates a name, `CatalogItemId`, and entry.
2013    fn insert_entry(&mut self, entry: CatalogEntry) {
2014        if !entry.id.is_system() {
2015            if let Some(cluster_id) = entry.item.cluster_id() {
2016                self.clusters_by_id
2017                    .get_mut(&cluster_id)
2018                    .expect("catalog out of sync")
2019                    .bound_objects
2020                    .insert(entry.id);
2021            };
2022        }
2023
2024        for u in entry.references().items() {
2025            match self.entry_by_id.get_mut(u) {
2026                Some(metadata) => metadata.referenced_by.push(entry.id()),
2027                None => panic!(
2028                    "Catalog: missing dependent catalog item {} while installing {}",
2029                    &u,
2030                    self.resolve_full_name(entry.name(), entry.conn_id())
2031                ),
2032            }
2033        }
2034        for u in entry.uses() {
2035            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
2036            // present.
2037            if u == entry.id() {
2038                continue;
2039            }
2040            match self.entry_by_id.get_mut(&u) {
2041                Some(metadata) => metadata.used_by.push(entry.id()),
2042                None => panic!(
2043                    "Catalog: missing dependent catalog item {} while installing {}",
2044                    &u,
2045                    self.resolve_full_name(entry.name(), entry.conn_id())
2046                ),
2047            }
2048        }
2049        for gid in entry.item.global_ids() {
2050            self.entry_by_global_id.insert(gid, entry.id());
2051        }
2052        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2053        // Lazily create the temporary schema if this is a temporary item and the schema
2054        // doesn't exist yet.
2055        if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
2056            && !self.temporary_schemas.contains_key(conn_id)
2057        {
2058            self.create_temporary_schema(conn_id, entry.owner_id)
2059                .expect("failed to create temporary schema");
2060        }
2061        let schema = self.get_schema_mut(
2062            &entry.name().qualifiers.database_spec,
2063            &entry.name().qualifiers.schema_spec,
2064            conn_id,
2065        );
2066
2067        let prev_id = match entry.item() {
2068            CatalogItem::Func(_) => schema
2069                .functions
2070                .insert(entry.name().item.clone(), entry.id()),
2071            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
2072            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
2073        };
2074
2075        assert!(
2076            prev_id.is_none(),
2077            "builtin name collision on {:?}",
2078            entry.name().item.clone()
2079        );
2080
2081        self.entry_by_id.insert(entry.id(), entry.clone());
2082    }
2083
2084    /// Associates a name, [`CatalogItemId`], and entry.
2085    fn insert_item(
2086        &mut self,
2087        id: CatalogItemId,
2088        oid: u32,
2089        name: QualifiedItemName,
2090        item: CatalogItem,
2091        owner_id: RoleId,
2092        privileges: PrivilegeMap,
2093    ) {
2094        let entry = CatalogEntry {
2095            item,
2096            name,
2097            id,
2098            oid,
2099            used_by: Vec::new(),
2100            referenced_by: Vec::new(),
2101            owner_id,
2102            privileges,
2103        };
2104
2105        self.insert_entry(entry);
2106    }
2107
2108    #[mz_ore::instrument(level = "trace")]
2109    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
2110        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
2111        for u in metadata.references().items() {
2112            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
2113                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
2114            }
2115        }
2116        for u in metadata.uses() {
2117            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
2118                dep_metadata.used_by.retain(|u| *u != metadata.id())
2119            }
2120        }
2121        for gid in metadata.global_ids() {
2122            self.entry_by_global_id.remove(&gid);
2123        }
2124
2125        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2126        let schema = self.get_schema_mut(
2127            &metadata.name().qualifiers.database_spec,
2128            &metadata.name().qualifiers.schema_spec,
2129            conn_id,
2130        );
2131        if metadata.item_type() == CatalogItemType::Type {
2132            schema
2133                .types
2134                .remove(&metadata.name().item)
2135                .expect("catalog out of sync");
2136        } else {
2137            // Functions would need special handling, but we don't yet support
2138            // dropping functions.
2139            assert_ne!(metadata.item_type(), CatalogItemType::Func);
2140
2141            schema
2142                .items
2143                .remove(&metadata.name().item)
2144                .expect("catalog out of sync");
2145        };
2146
2147        if !id.is_system() {
2148            if let Some(cluster_id) = metadata.item().cluster_id() {
2149                assert!(
2150                    self.clusters_by_id
2151                        .get_mut(&cluster_id)
2152                        .expect("catalog out of sync")
2153                        .bound_objects
2154                        .remove(&id),
2155                    "catalog out of sync"
2156                );
2157            }
2158        }
2159
2160        metadata
2161    }
2162
2163    fn insert_introspection_source_index(
2164        &mut self,
2165        cluster_id: ClusterId,
2166        log: &'static BuiltinLog,
2167        item_id: CatalogItemId,
2168        global_id: GlobalId,
2169        oid: u32,
2170    ) {
2171        let (index_name, index) =
2172            self.create_introspection_source_index(cluster_id, log, global_id);
2173        self.insert_item(
2174            item_id,
2175            oid,
2176            index_name,
2177            index,
2178            MZ_SYSTEM_ROLE_ID,
2179            PrivilegeMap::default(),
2180        );
2181    }
2182
2183    fn create_introspection_source_index(
2184        &self,
2185        cluster_id: ClusterId,
2186        log: &'static BuiltinLog,
2187        global_id: GlobalId,
2188    ) -> (QualifiedItemName, CatalogItem) {
2189        let source_name = FullItemName {
2190            database: RawDatabaseSpecifier::Ambient,
2191            schema: log.schema.into(),
2192            item: log.name.into(),
2193        };
2194        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
2195        let mut index_name = QualifiedItemName {
2196            qualifiers: ItemQualifiers {
2197                database_spec: ResolvedDatabaseSpecifier::Ambient,
2198                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
2199            },
2200            item: index_name.clone(),
2201        };
2202        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
2203        let index_item_name = index_name.item.clone();
2204        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
2205        let index = CatalogItem::Index(Index {
2206            global_id,
2207            on: log_global_id,
2208            keys: log
2209                .variant
2210                .index_by()
2211                .into_iter()
2212                .map(MirScalarExpr::column)
2213                .collect(),
2214            create_sql: index_sql(
2215                index_item_name,
2216                cluster_id,
2217                source_name,
2218                &log.variant.desc(),
2219                &log.variant.index_by(),
2220            ),
2221            conn_id: None,
2222            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
2223            cluster_id,
2224            is_retained_metrics_object: false,
2225            custom_logical_compaction_window: None,
2226            optimized_plan: None,
2227            physical_plan: None,
2228            dataflow_metainfo: None,
2229        });
2230        (index_name, index)
2231    }
2232
2233    /// Insert system configuration `name` with `value`.
2234    ///
2235    /// Return a `bool` value indicating whether the configuration was modified
2236    /// by the call.
2237    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
2238        Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
2239    }
2240
2241    /// Reset system configuration `name`.
2242    ///
2243    /// Return a `bool` value indicating whether the configuration was modified
2244    /// by the call.
2245    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
2246        Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
2247    }
2248}
2249
2250/// Sort [`StateUpdate`]s in dependency order.
2251///
2252/// # Panics
2253///
2254/// This function assumes that all provided `updates` have the same timestamp and will panic
2255/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
2256/// `StateUpdateKinds` are unique.
2257fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
2258    fn push_update<T>(
2259        update: T,
2260        diff: StateDiff,
2261        retractions: &mut Vec<T>,
2262        additions: &mut Vec<T>,
2263    ) {
2264        match diff {
2265            StateDiff::Retraction => retractions.push(update),
2266            StateDiff::Addition => additions.push(update),
2267        }
2268    }
2269
2270    soft_assert_no_log!(
2271        updates.iter().map(|update| update.ts).all_equal(),
2272        "all timestamps should be equal: {updates:?}"
2273    );
2274    soft_assert_no_log!(
2275        {
2276            let mut dedup = BTreeSet::new();
2277            updates.iter().all(|update| dedup.insert(&update.kind))
2278        },
2279        "updates should be consolidated: {updates:?}"
2280    );
2281
2282    // Partition updates by type so that we can weave different update types into the right spots.
2283    let mut pre_cluster_retractions = Vec::new();
2284    let mut pre_cluster_additions = Vec::new();
2285    let mut cluster_retractions = Vec::new();
2286    let mut cluster_additions = Vec::new();
2287    let mut builtin_item_updates = Vec::new();
2288    let mut item_retractions = Vec::new();
2289    let mut item_additions = Vec::new();
2290    let mut temp_item_retractions = Vec::new();
2291    let mut temp_item_additions = Vec::new();
2292    let mut post_item_retractions = Vec::new();
2293    let mut post_item_additions = Vec::new();
2294    for update in updates {
2295        let diff = update.diff.clone();
2296        match update.kind {
2297            StateUpdateKind::Role(_)
2298            | StateUpdateKind::RoleAuth(_)
2299            | StateUpdateKind::Database(_)
2300            | StateUpdateKind::Schema(_)
2301            | StateUpdateKind::DefaultPrivilege(_)
2302            | StateUpdateKind::SystemPrivilege(_)
2303            | StateUpdateKind::SystemConfiguration(_)
2304            | StateUpdateKind::NetworkPolicy(_) => push_update(
2305                update,
2306                diff,
2307                &mut pre_cluster_retractions,
2308                &mut pre_cluster_additions,
2309            ),
2310            StateUpdateKind::Cluster(_)
2311            | StateUpdateKind::ClusterSystemConfiguration(_)
2312            | StateUpdateKind::IntrospectionSourceIndex(_)
2313            | StateUpdateKind::ClusterReplica(_)
2314            | StateUpdateKind::ReplicaSystemConfiguration(_) => push_update(
2315                update,
2316                diff,
2317                &mut cluster_retractions,
2318                &mut cluster_additions,
2319            ),
2320            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2321                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2322            }
2323            StateUpdateKind::TemporaryItem(item) => push_update(
2324                (item, update.ts, update.diff),
2325                diff,
2326                &mut temp_item_retractions,
2327                &mut temp_item_additions,
2328            ),
2329            StateUpdateKind::Item(item) => push_update(
2330                (item, update.ts, update.diff),
2331                diff,
2332                &mut item_retractions,
2333                &mut item_additions,
2334            ),
2335            StateUpdateKind::Comment(_)
2336            | StateUpdateKind::SourceReferences(_)
2337            | StateUpdateKind::AuditLog(_)
2338            | StateUpdateKind::StorageCollectionMetadata(_)
2339            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2340                update,
2341                diff,
2342                &mut post_item_retractions,
2343                &mut post_item_additions,
2344            ),
2345        }
2346    }
2347
2348    // Sort builtin item updates by dependency. The builtin definition order must be a dependency
2349    // order, so we can use that to avoid a more expensive topo sorting.
2350    let builtin_item_updates = builtin_item_updates
2351        .into_iter()
2352        .map(|(system_object_mapping, ts, diff)| {
2353            let idx = BUILTIN_LOOKUP
2354                .get(&system_object_mapping.description)
2355                .expect("missing builtin")
2356                .0;
2357            (idx, system_object_mapping, ts, diff)
2358        })
2359        .sorted_by_key(|(idx, _, _, _)| *idx)
2360        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2361
2362    // Partition builtins based on whether or not they should be applied before or after clusters.
2363    // Clusters depend on sources (introspection logs), so sources need to be applied first.
2364    // Everything else can be applied after clusters.
2365    let mut builtin_source_retractions = Vec::new();
2366    let mut builtin_source_additions = Vec::new();
2367    let mut other_builtin_retractions = Vec::new();
2368    let mut other_builtin_additions = Vec::new();
2369    for (builtin_item_update, ts, diff) in builtin_item_updates {
2370        let object_type = builtin_item_update.description.object_type;
2371        let update = StateUpdate {
2372            kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2373            ts,
2374            diff,
2375        };
2376        if object_type == CatalogItemType::Source {
2377            push_update(
2378                update,
2379                diff,
2380                &mut builtin_source_retractions,
2381                &mut builtin_source_additions,
2382            );
2383        } else {
2384            push_update(
2385                update,
2386                diff,
2387                &mut other_builtin_retractions,
2388                &mut other_builtin_additions,
2389            );
2390        }
2391    }
2392
2393    /// Sort items by their dependencies using topological sort.
2394    ///
2395    /// # Panics
2396    ///
2397    /// This function requires that all provided items have unique item IDs.
2398    fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2399        tracing::debug!(?items, "sorting items by dependencies");
2400
2401        let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2402        let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2403            let statement = mz_sql::parse::parse(&item.0.create_sql)
2404                .expect("valid create_sql")
2405                .into_element()
2406                .ast;
2407            mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2408        };
2409        sort_topological(items, key_fn, dependencies_fn);
2410    }
2411
2412    /// Sort item updates by dependency.
2413    ///
2414    /// First we group items into groups that are totally ordered by dependency. For example, when
2415    /// sorting all items by dependency we know that all tables can come after all sources, because
2416    /// a source can never depend on a table. Second, we sort the items in each group in
2417    /// topological order, or by ID, depending on the type.
2418    ///
2419    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2420    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2421    /// approach would be to investigate each item, discover their exact dependencies, and then
2422    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2423    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2424    /// referred to by name.
2425    ///
2426    /// The logic of this function should match [`sort_temp_item_updates`].
2427    fn sort_item_updates(
2428        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2429    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2430        // Partition items into groups s.t. each item in one group has a predefined order with all
2431        // items in other groups. For example, all sinks are ordered greater than all tables.
2432        let mut types = Vec::new();
2433        // N.B. Functions can depend on system tables, but not user tables.
2434        // TODO(udf): This will change when UDFs are supported.
2435        let mut funcs = Vec::new();
2436        let mut secrets = Vec::new();
2437        let mut connections = Vec::new();
2438        let mut sources = Vec::new();
2439        let mut tables = Vec::new();
2440        let mut derived_items = Vec::new();
2441        let mut sinks = Vec::new();
2442        for update in item_updates {
2443            match update.0.item_type() {
2444                CatalogItemType::Type => types.push(update),
2445                CatalogItemType::Func => funcs.push(update),
2446                CatalogItemType::Secret => secrets.push(update),
2447                CatalogItemType::Connection => connections.push(update),
2448                CatalogItemType::Source => sources.push(update),
2449                CatalogItemType::Table => tables.push(update),
2450                CatalogItemType::View
2451                | CatalogItemType::MaterializedView
2452                | CatalogItemType::Index => derived_items.push(update),
2453                CatalogItemType::Sink => sinks.push(update),
2454            }
2455        }
2456
2457        // For some groups, the items in them can depend on each other and can be `ALTER`ed so that
2458        // an item ends up depending on an item with a greater ID. Thus we need to perform
2459        // topological sort for these groups.
2460        sort_items_topological(&mut connections);
2461        sort_items_topological(&mut derived_items);
2462
2463        // Other groups we can simply sort by ID.
2464        for group in [
2465            &mut types,
2466            &mut funcs,
2467            &mut secrets,
2468            &mut sources,
2469            &mut tables,
2470            &mut sinks,
2471        ] {
2472            group.sort_by_key(|(item, _, _)| item.id);
2473        }
2474
2475        iter::empty()
2476            .chain(types)
2477            .chain(funcs)
2478            .chain(secrets)
2479            .chain(connections)
2480            .chain(sources)
2481            .chain(tables)
2482            .chain(derived_items)
2483            .chain(sinks)
2484            .collect()
2485    }
2486
2487    let item_retractions = sort_item_updates(item_retractions);
2488    let item_additions = sort_item_updates(item_additions);
2489
2490    /// Sort temporary item updates by dependency.
2491    ///
2492    /// The logic of this function should match [`sort_item_updates`].
2493    fn sort_temp_item_updates(
2494        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2495    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2496        // Partition items into groups s.t. each item in one group has a predefined order with all
2497        // items in other groups. For example, all sinks are ordered greater than all tables.
2498        let mut types = Vec::new();
2499        // N.B. Functions can depend on system tables, but not user tables.
2500        let mut funcs = Vec::new();
2501        let mut secrets = Vec::new();
2502        let mut connections = Vec::new();
2503        let mut sources = Vec::new();
2504        let mut tables = Vec::new();
2505        let mut derived_items = Vec::new();
2506        let mut sinks = Vec::new();
2507        for update in temp_item_updates {
2508            match update.0.item_type() {
2509                CatalogItemType::Type => types.push(update),
2510                CatalogItemType::Func => funcs.push(update),
2511                CatalogItemType::Secret => secrets.push(update),
2512                CatalogItemType::Connection => connections.push(update),
2513                CatalogItemType::Source => sources.push(update),
2514                CatalogItemType::Table => tables.push(update),
2515                CatalogItemType::View
2516                | CatalogItemType::MaterializedView
2517                | CatalogItemType::Index => derived_items.push(update),
2518                CatalogItemType::Sink => sinks.push(update),
2519            }
2520        }
2521
2522        // Within each group, sort by ID.
2523        for group in [
2524            &mut types,
2525            &mut funcs,
2526            &mut secrets,
2527            &mut connections,
2528            &mut sources,
2529            &mut tables,
2530            &mut derived_items,
2531            &mut sinks,
2532        ] {
2533            group.sort_by_key(|(item, _, _)| item.id);
2534        }
2535
2536        iter::empty()
2537            .chain(types)
2538            .chain(funcs)
2539            .chain(secrets)
2540            .chain(connections)
2541            .chain(sources)
2542            .chain(tables)
2543            .chain(derived_items)
2544            .chain(sinks)
2545            .collect()
2546    }
2547    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2548    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2549
2550    /// Merge sorted temporary and non-temp items.
2551    fn merge_item_updates(
2552        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2553        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2554    ) -> Vec<StateUpdate> {
2555        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2556
2557        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2558            (item_updates.front(), temp_item_updates.front())
2559        {
2560            if item.id < temp_item.id {
2561                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2562                state_updates.push(StateUpdate {
2563                    kind: StateUpdateKind::Item(item),
2564                    ts,
2565                    diff,
2566                });
2567            } else if item.id > temp_item.id {
2568                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2569                state_updates.push(StateUpdate {
2570                    kind: StateUpdateKind::TemporaryItem(temp_item),
2571                    ts,
2572                    diff,
2573                });
2574            } else {
2575                unreachable!(
2576                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2577                );
2578            }
2579        }
2580
2581        while let Some((item, ts, diff)) = item_updates.pop_front() {
2582            state_updates.push(StateUpdate {
2583                kind: StateUpdateKind::Item(item),
2584                ts,
2585                diff,
2586            });
2587        }
2588
2589        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2590            state_updates.push(StateUpdate {
2591                kind: StateUpdateKind::TemporaryItem(temp_item),
2592                ts,
2593                diff,
2594            });
2595        }
2596
2597        state_updates
2598    }
2599    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2600    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2601
2602    // Put everything back together.
2603    iter::empty()
2604        // All retractions must be reversed.
2605        .chain(post_item_retractions.into_iter().rev())
2606        .chain(item_retractions.into_iter().rev())
2607        .chain(other_builtin_retractions.into_iter().rev())
2608        .chain(cluster_retractions.into_iter().rev())
2609        .chain(builtin_source_retractions.into_iter().rev())
2610        .chain(pre_cluster_retractions.into_iter().rev())
2611        .chain(pre_cluster_additions)
2612        .chain(builtin_source_additions)
2613        .chain(cluster_additions)
2614        .chain(other_builtin_additions)
2615        .chain(item_additions)
2616        .chain(post_item_additions)
2617        .collect()
2618}
2619
2620/// Groups of updates of certain types are applied in batches to improve
2621/// performance. A constraint is that updates must be applied in order. This
2622/// process is modeled as a state machine that batches then applies groups of
2623/// updates.
2624enum ApplyState {
2625    /// Additions of builtin views.
2626    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2627    /// Item updates that aren't builtin view additions.
2628    ///
2629    /// This contains all updates whose application requires calling
2630    /// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2631    /// flags.
2632    Items(Vec<StateUpdate>),
2633    /// All other updates.
2634    Updates(Vec<StateUpdate>),
2635}
2636
2637impl ApplyState {
2638    fn new(update: StateUpdate) -> Self {
2639        use StateUpdateKind::*;
2640        match &update.kind {
2641            SystemObjectMapping(som)
2642                if som.description.object_type == CatalogItemType::View
2643                    && update.diff == StateDiff::Addition =>
2644            {
2645                let view_addition = lookup_builtin_view_addition(som.clone());
2646                Self::BuiltinViewAdditions(vec![view_addition])
2647            }
2648
2649            IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2650                Self::Items(vec![update])
2651            }
2652
2653            Role(_)
2654            | RoleAuth(_)
2655            | Database(_)
2656            | Schema(_)
2657            | DefaultPrivilege(_)
2658            | SystemPrivilege(_)
2659            | SystemConfiguration(_)
2660            | ClusterSystemConfiguration(_)
2661            | ReplicaSystemConfiguration(_)
2662            | Cluster(_)
2663            | NetworkPolicy(_)
2664            | ClusterReplica(_)
2665            | SourceReferences(_)
2666            | Comment(_)
2667            | AuditLog(_)
2668            | StorageCollectionMetadata(_)
2669            | UnfinalizedShard(_) => Self::Updates(vec![update]),
2670        }
2671    }
2672
2673    /// Apply all updates that have been batched in `self`.
2674    ///
2675    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2676    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2677    /// details.
2678    async fn apply(
2679        self,
2680        state: &mut CatalogState,
2681        retractions: &mut InProgressRetractions,
2682        local_expression_cache: &mut LocalExpressionCache,
2683    ) -> (
2684        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2685        Vec<ParsedStateUpdate>,
2686    ) {
2687        match self {
2688            Self::BuiltinViewAdditions(builtin_view_additions) => {
2689                let restore = Arc::clone(&state.system_configuration);
2690                Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2691                let builtin_table_updates = CatalogState::parse_builtin_views(
2692                    state,
2693                    builtin_view_additions,
2694                    retractions,
2695                    local_expression_cache,
2696                )
2697                .await;
2698                state.system_configuration = restore;
2699                (builtin_table_updates, Vec::new())
2700            }
2701            Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2702                state
2703                    .apply_updates_inner(updates, retractions, local_expression_cache)
2704                    .expect("corrupt catalog")
2705            }),
2706            Self::Updates(updates) => state
2707                .apply_updates_inner(updates, retractions, local_expression_cache)
2708                .expect("corrupt catalog"),
2709        }
2710    }
2711
2712    async fn step(
2713        self,
2714        next: Self,
2715        state: &mut CatalogState,
2716        retractions: &mut InProgressRetractions,
2717        local_expression_cache: &mut LocalExpressionCache,
2718    ) -> (
2719        Self,
2720        (
2721            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2722            Vec<ParsedStateUpdate>,
2723        ),
2724    ) {
2725        match (self, next) {
2726            (
2727                Self::BuiltinViewAdditions(mut builtin_view_additions),
2728                Self::BuiltinViewAdditions(next_builtin_view_additions),
2729            ) => {
2730                // Continue batching builtin view additions.
2731                builtin_view_additions.extend(next_builtin_view_additions);
2732                (
2733                    Self::BuiltinViewAdditions(builtin_view_additions),
2734                    (Vec::new(), Vec::new()),
2735                )
2736            }
2737            (Self::Items(mut updates), Self::Items(next_updates)) => {
2738                // Continue batching item updates.
2739                updates.extend(next_updates);
2740                (Self::Items(updates), (Vec::new(), Vec::new()))
2741            }
2742            (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2743                // Continue batching updates.
2744                updates.extend(next_updates);
2745                (Self::Updates(updates), (Vec::new(), Vec::new()))
2746            }
2747            (apply_state, next_apply_state) => {
2748                // Apply the current batch and start batching new apply state.
2749                let updates = apply_state
2750                    .apply(state, retractions, local_expression_cache)
2751                    .await;
2752                (next_apply_state, updates)
2753            }
2754        }
2755    }
2756}
2757
2758/// Trait abstracting over map operations needed by [`apply_inverted_lookup`] and
2759/// [`apply_with_update`]. Both [`BTreeMap`] and [`imbl::OrdMap`] implement this.
2760trait MutableMap<K, V> {
2761    fn insert(&mut self, key: K, value: V) -> Option<V>;
2762    fn remove(&mut self, key: &K) -> Option<V>;
2763}
2764
2765impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2766    fn insert(&mut self, key: K, value: V) -> Option<V> {
2767        BTreeMap::insert(self, key, value)
2768    }
2769    fn remove(&mut self, key: &K) -> Option<V> {
2770        BTreeMap::remove(self, key)
2771    }
2772}
2773
2774impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2775    fn insert(&mut self, key: K, value: V) -> Option<V> {
2776        imbl::OrdMap::insert(self, key, value)
2777    }
2778    fn remove(&mut self, key: &K) -> Option<V> {
2779        imbl::OrdMap::remove(self, key)
2780    }
2781}
2782
2783/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2784/// generally IDs.
2785///
2786/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2787fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2788where
2789    K: Ord + Clone + Debug,
2790    V: PartialEq + Debug,
2791{
2792    match diff {
2793        StateDiff::Retraction => {
2794            let prev = map.remove(key);
2795            assert_eq!(
2796                prev,
2797                Some(value),
2798                "retraction does not match existing value: {key:?}"
2799            );
2800        }
2801        StateDiff::Addition => {
2802            let prev = map.insert(key.clone(), value);
2803            assert_eq!(
2804                prev, None,
2805                "values must be explicitly retracted before inserting a new value: {key:?}"
2806            );
2807        }
2808    }
2809}
2810
2811/// Helper method to update catalog state, that may need to be updated from a previously retracted
2812/// object.
2813fn apply_with_update<K, V, D>(
2814    map: &mut impl MutableMap<K, V>,
2815    durable: D,
2816    key_fn: impl FnOnce(&D) -> K,
2817    diff: StateDiff,
2818    retractions: &mut BTreeMap<D::Key, V>,
2819) where
2820    K: Ord,
2821    V: UpdateFrom<D> + PartialEq + Debug,
2822    D: DurableType,
2823    D::Key: Ord,
2824{
2825    match diff {
2826        StateDiff::Retraction => {
2827            let mem_key = key_fn(&durable);
2828            let value = map
2829                .remove(&mem_key)
2830                .expect("retraction does not match existing value: {key:?}");
2831            let durable_key = durable.into_key_value().0;
2832            retractions.insert(durable_key, value);
2833        }
2834        StateDiff::Addition => {
2835            let mem_key = key_fn(&durable);
2836            let durable_key = durable.key();
2837            let value = match retractions.remove(&durable_key) {
2838                Some(mut retraction) => {
2839                    retraction.update_from(durable);
2840                    retraction
2841                }
2842                None => durable.into(),
2843            };
2844            let prev = map.insert(mem_key, value);
2845            assert_eq!(
2846                prev, None,
2847                "values must be explicitly retracted before inserting a new value"
2848            );
2849        }
2850    }
2851}
2852
2853/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2854fn lookup_builtin_view_addition(
2855    mapping: SystemObjectMapping,
2856) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2857    let (_, builtin) = BUILTIN_LOOKUP
2858        .get(&mapping.description)
2859        .expect("missing builtin view");
2860    let Builtin::View(view) = builtin else {
2861        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2862    };
2863
2864    (
2865        view,
2866        mapping.unique_identifier.catalog_id,
2867        mapping.unique_identifier.global_id,
2868    )
2869}