mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29    SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
35    NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36    TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{assert_none, instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{Instrument, info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::index_sql;
67
68/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
69/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
70/// results in applying a retraction for that object followed by applying an addition for that
71/// object. When applying those additions it can be extremely expensive to re-build that
72/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
73/// retractions, so it can be used during additions.
74///
75/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
76/// objects that maintain denormalized state.
77// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
78// the update step is a no-op for certain types.
79#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81    roles: BTreeMap<RoleKey, Role>,
82    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83    databases: BTreeMap<DatabaseKey, Database>,
84    schemas: BTreeMap<SchemaKey, Schema>,
85    clusters: BTreeMap<ClusterKey, Cluster>,
86    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87    items: BTreeMap<ItemKey, CatalogEntry>,
88    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
95    ///
96    /// Returns builtin table updates corresponding to the changes to catalog state.
97    ///
98    /// This is meant specifically for bootstrapping because it batches and applies builtin view
99    /// additions separately from other update types.
100    #[must_use]
101    #[instrument]
102    pub(crate) async fn apply_updates_for_bootstrap(
103        &mut self,
104        updates: Vec<StateUpdate>,
105        local_expression_cache: &mut LocalExpressionCache,
106    ) -> (
107        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
108        Vec<ParsedStateUpdate>,
109    ) {
110        let mut builtin_table_updates = Vec::with_capacity(updates.len());
111        let mut catalog_updates = Vec::with_capacity(updates.len());
112        let updates = sort_updates(updates);
113
114        let mut groups: Vec<Vec<_>> = Vec::new();
115        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
116            groups.push(updates.collect());
117        }
118        for updates in groups {
119            let mut apply_state = BootstrapApplyState::Updates(Vec::new());
120            let mut retractions = InProgressRetractions::default();
121
122            for update in updates {
123                let next_apply_state = BootstrapApplyState::new(update);
124                let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
125                    .step(
126                        next_apply_state,
127                        self,
128                        &mut retractions,
129                        local_expression_cache,
130                    )
131                    .await;
132                apply_state = next_apply_state;
133                builtin_table_updates.extend(builtin_table_update);
134                catalog_updates.extend(catalog_update);
135            }
136
137            // Apply remaining state.
138            let (builtin_table_update, catalog_update) = apply_state
139                .apply(self, &mut retractions, local_expression_cache)
140                .await;
141            builtin_table_updates.extend(builtin_table_update);
142            catalog_updates.extend(catalog_update);
143        }
144
145        (builtin_table_updates, catalog_updates)
146    }
147
148    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
149    ///
150    /// Returns builtin table updates corresponding to the changes to catalog state.
151    #[instrument]
152    pub(crate) fn apply_updates(
153        &mut self,
154        updates: Vec<StateUpdate>,
155    ) -> Result<
156        (
157            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
158            Vec<ParsedStateUpdate>,
159        ),
160        CatalogError,
161    > {
162        let mut builtin_table_updates = Vec::with_capacity(updates.len());
163        let mut catalog_updates = Vec::with_capacity(updates.len());
164
165        // First, consolidate updates. The code that applies parsed state
166        // updates _requires_ that the given updates are consolidated. There
167        // must be at most one addition and/or one retraction for a given item,
168        // as identified by that items ID type.
169        let updates = Self::consolidate_updates(updates);
170
171        // Then bring it into the pseudo-topological order that we need for
172        // updating our in-memory state and generating builtin table updates.
173        let updates = sort_updates(updates);
174
175        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
176            let mut retractions = InProgressRetractions::default();
177            let (builtin_table_update, catalog_updates_op) = self.apply_updates_inner(
178                updates.collect(),
179                &mut retractions,
180                &mut LocalExpressionCache::Closed,
181            )?;
182            builtin_table_updates.extend(builtin_table_update);
183            catalog_updates.extend(catalog_updates_op);
184        }
185
186        Ok((builtin_table_updates, catalog_updates))
187    }
188
189    /// It can happen that the sequencing logic creates "fluctuating" updates
190    /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
191    /// for a table, there will be a retraction of the original table state,
192    /// then an addition for the same table but stripped of some of the roles
193    /// and access things, and then a retraction for that intermediate table
194    /// state. By consolidating, the intermediate state addition/retraction will
195    /// cancel out and we'll only see the retraction for the original state.
196    fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
197        let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
198            .into_iter()
199            .map(|update| (update.kind, update.ts, update.diff.into()))
200            .collect_vec();
201
202        consolidate_updates(&mut updates);
203
204        updates
205            .into_iter()
206            .filter(|(_kind, _ts, diff)| *diff != 0.into())
207            .map(|(kind, ts, diff)| StateUpdate {
208                kind,
209                ts,
210                diff: diff
211                    .try_into()
212                    .expect("catalog state cannot have diff other than -1 or 1"),
213            })
214            .collect_vec()
215    }
216
217    #[instrument(level = "debug")]
218    fn apply_updates_inner(
219        &mut self,
220        updates: Vec<StateUpdate>,
221        retractions: &mut InProgressRetractions,
222        local_expression_cache: &mut LocalExpressionCache,
223    ) -> Result<
224        (
225            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
226            Vec<ParsedStateUpdate>,
227        ),
228        CatalogError,
229    > {
230        soft_assert_no_log!(
231            updates.iter().map(|update| update.ts).all_equal(),
232            "all timestamps should be equal: {updates:?}"
233        );
234
235        let mut update_system_config = false;
236
237        let mut builtin_table_updates = Vec::with_capacity(updates.len());
238        let mut catalog_updates = Vec::new();
239
240        for state_update in updates {
241            if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
242                update_system_config = true;
243            }
244
245            match state_update.diff {
246                StateDiff::Retraction => {
247                    // We want the parsed catalog updates to match the state of
248                    // the catalog _before_ applying a retraction. So that we can
249                    // still have useful in-memory state to work with.
250                    if let Some(update) =
251                        parsed_state_updates::parse_state_update(self, state_update.clone())
252                    {
253                        catalog_updates.push(update);
254                    }
255
256                    // We want the builtin table retraction to match the state of the catalog
257                    // before applying the update.
258                    builtin_table_updates.extend(self.generate_builtin_table_update(
259                        state_update.kind.clone(),
260                        state_update.diff,
261                    ));
262                    self.apply_update(
263                        state_update.kind,
264                        state_update.diff,
265                        retractions,
266                        local_expression_cache,
267                    )?;
268                }
269                StateDiff::Addition => {
270                    self.apply_update(
271                        state_update.kind.clone(),
272                        state_update.diff,
273                        retractions,
274                        local_expression_cache,
275                    )?;
276                    // We want the builtin table addition to match the state of
277                    // the catalog after applying the update. So that we already
278                    // have useful in-memory state to work with.
279                    builtin_table_updates.extend(self.generate_builtin_table_update(
280                        state_update.kind.clone(),
281                        state_update.diff,
282                    ));
283
284                    // We want the parsed catalog updates to match the state of
285                    // the catalog _after_ applying an addition.
286                    if let Some(update) =
287                        parsed_state_updates::parse_state_update(self, state_update.clone())
288                    {
289                        catalog_updates.push(update);
290                    }
291                }
292            }
293        }
294
295        if update_system_config {
296            self.system_configuration.dyncfg_updates();
297        }
298
299        Ok((builtin_table_updates, catalog_updates))
300    }
301
302    #[instrument(level = "debug")]
303    fn apply_update(
304        &mut self,
305        kind: StateUpdateKind,
306        diff: StateDiff,
307        retractions: &mut InProgressRetractions,
308        local_expression_cache: &mut LocalExpressionCache,
309    ) -> Result<(), CatalogError> {
310        match kind {
311            StateUpdateKind::Role(role) => {
312                self.apply_role_update(role, diff, retractions);
313            }
314            StateUpdateKind::RoleAuth(role_auth) => {
315                self.apply_role_auth_update(role_auth, diff, retractions);
316            }
317            StateUpdateKind::Database(database) => {
318                self.apply_database_update(database, diff, retractions);
319            }
320            StateUpdateKind::Schema(schema) => {
321                self.apply_schema_update(schema, diff, retractions);
322            }
323            StateUpdateKind::DefaultPrivilege(default_privilege) => {
324                self.apply_default_privilege_update(default_privilege, diff, retractions);
325            }
326            StateUpdateKind::SystemPrivilege(system_privilege) => {
327                self.apply_system_privilege_update(system_privilege, diff, retractions);
328            }
329            StateUpdateKind::SystemConfiguration(system_configuration) => {
330                self.apply_system_configuration_update(system_configuration, diff, retractions);
331            }
332            StateUpdateKind::Cluster(cluster) => {
333                self.apply_cluster_update(cluster, diff, retractions);
334            }
335            StateUpdateKind::NetworkPolicy(network_policy) => {
336                self.apply_network_policy_update(network_policy, diff, retractions);
337            }
338            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
339                self.apply_introspection_source_index_update(
340                    introspection_source_index,
341                    diff,
342                    retractions,
343                );
344            }
345            StateUpdateKind::ClusterReplica(cluster_replica) => {
346                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
347            }
348            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
349                self.apply_system_object_mapping_update(
350                    system_object_mapping,
351                    diff,
352                    retractions,
353                    local_expression_cache,
354                );
355            }
356            StateUpdateKind::TemporaryItem(item) => {
357                self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
358            }
359            StateUpdateKind::Item(item) => {
360                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
361            }
362            StateUpdateKind::Comment(comment) => {
363                self.apply_comment_update(comment, diff, retractions);
364            }
365            StateUpdateKind::SourceReferences(source_reference) => {
366                self.apply_source_references_update(source_reference, diff, retractions);
367            }
368            StateUpdateKind::AuditLog(_audit_log) => {
369                // Audit logs are not stored in-memory.
370            }
371            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
372                self.apply_storage_collection_metadata_update(
373                    storage_collection_metadata,
374                    diff,
375                    retractions,
376                );
377            }
378            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
379                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
380            }
381        }
382
383        Ok(())
384    }
385
386    #[instrument(level = "debug")]
387    fn apply_role_auth_update(
388        &mut self,
389        role_auth: mz_catalog::durable::RoleAuth,
390        diff: StateDiff,
391        retractions: &mut InProgressRetractions,
392    ) {
393        apply_with_update(
394            &mut self.role_auth_by_id,
395            role_auth,
396            |role_auth| role_auth.role_id,
397            diff,
398            &mut retractions.role_auths,
399        );
400    }
401
402    #[instrument(level = "debug")]
403    fn apply_role_update(
404        &mut self,
405        role: mz_catalog::durable::Role,
406        diff: StateDiff,
407        retractions: &mut InProgressRetractions,
408    ) {
409        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
410        apply_with_update(
411            &mut self.roles_by_id,
412            role,
413            |role| role.id,
414            diff,
415            &mut retractions.roles,
416        );
417    }
418
419    #[instrument(level = "debug")]
420    fn apply_database_update(
421        &mut self,
422        database: mz_catalog::durable::Database,
423        diff: StateDiff,
424        retractions: &mut InProgressRetractions,
425    ) {
426        apply_inverted_lookup(
427            &mut self.database_by_name,
428            &database.name,
429            database.id,
430            diff,
431        );
432        apply_with_update(
433            &mut self.database_by_id,
434            database,
435            |database| database.id,
436            diff,
437            &mut retractions.databases,
438        );
439    }
440
441    #[instrument(level = "debug")]
442    fn apply_schema_update(
443        &mut self,
444        schema: mz_catalog::durable::Schema,
445        diff: StateDiff,
446        retractions: &mut InProgressRetractions,
447    ) {
448        let (schemas_by_id, schemas_by_name) = match &schema.database_id {
449            Some(database_id) => {
450                let db = self
451                    .database_by_id
452                    .get_mut(database_id)
453                    .expect("catalog out of sync");
454                (&mut db.schemas_by_id, &mut db.schemas_by_name)
455            }
456            None => (
457                &mut self.ambient_schemas_by_id,
458                &mut self.ambient_schemas_by_name,
459            ),
460        };
461        apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
462        apply_with_update(
463            schemas_by_id,
464            schema,
465            |schema| schema.id,
466            diff,
467            &mut retractions.schemas,
468        );
469    }
470
471    #[instrument(level = "debug")]
472    fn apply_default_privilege_update(
473        &mut self,
474        default_privilege: mz_catalog::durable::DefaultPrivilege,
475        diff: StateDiff,
476        _retractions: &mut InProgressRetractions,
477    ) {
478        match diff {
479            StateDiff::Addition => self
480                .default_privileges
481                .grant(default_privilege.object, default_privilege.acl_item),
482            StateDiff::Retraction => self
483                .default_privileges
484                .revoke(&default_privilege.object, &default_privilege.acl_item),
485        }
486    }
487
488    #[instrument(level = "debug")]
489    fn apply_system_privilege_update(
490        &mut self,
491        system_privilege: MzAclItem,
492        diff: StateDiff,
493        _retractions: &mut InProgressRetractions,
494    ) {
495        match diff {
496            StateDiff::Addition => self.system_privileges.grant(system_privilege),
497            StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
498        }
499    }
500
501    #[instrument(level = "debug")]
502    fn apply_system_configuration_update(
503        &mut self,
504        system_configuration: mz_catalog::durable::SystemConfiguration,
505        diff: StateDiff,
506        _retractions: &mut InProgressRetractions,
507    ) {
508        let res = match diff {
509            StateDiff::Addition => self.insert_system_configuration(
510                &system_configuration.name,
511                VarInput::Flat(&system_configuration.value),
512            ),
513            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
514        };
515        match res {
516            Ok(_) => (),
517            // When system variables are deleted, nothing deletes them from the underlying
518            // durable catalog, which isn't great. Still, we need to be able to ignore
519            // unknown variables.
520            Err(Error {
521                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
522            }) => {
523                warn!(%name, "unknown system parameter from catalog storage");
524            }
525            Err(e) => panic!("unable to update system variable: {e:?}"),
526        }
527    }
528
529    #[instrument(level = "debug")]
530    fn apply_cluster_update(
531        &mut self,
532        cluster: mz_catalog::durable::Cluster,
533        diff: StateDiff,
534        retractions: &mut InProgressRetractions,
535    ) {
536        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
537        apply_with_update(
538            &mut self.clusters_by_id,
539            cluster,
540            |cluster| cluster.id,
541            diff,
542            &mut retractions.clusters,
543        );
544    }
545
546    #[instrument(level = "debug")]
547    fn apply_network_policy_update(
548        &mut self,
549        policy: mz_catalog::durable::NetworkPolicy,
550        diff: StateDiff,
551        retractions: &mut InProgressRetractions,
552    ) {
553        apply_inverted_lookup(
554            &mut self.network_policies_by_name,
555            &policy.name,
556            policy.id,
557            diff,
558        );
559        apply_with_update(
560            &mut self.network_policies_by_id,
561            policy,
562            |policy| policy.id,
563            diff,
564            &mut retractions.network_policies,
565        );
566    }
567
568    #[instrument(level = "debug")]
569    fn apply_introspection_source_index_update(
570        &mut self,
571        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
572        diff: StateDiff,
573        retractions: &mut InProgressRetractions,
574    ) {
575        let cluster = self
576            .clusters_by_id
577            .get_mut(&introspection_source_index.cluster_id)
578            .expect("catalog out of sync");
579        let log = BUILTIN_LOG_LOOKUP
580            .get(introspection_source_index.name.as_str())
581            .expect("missing log");
582        apply_inverted_lookup(
583            &mut cluster.log_indexes,
584            &log.variant,
585            introspection_source_index.index_id,
586            diff,
587        );
588
589        match diff {
590            StateDiff::Addition => {
591                if let Some(mut entry) = retractions
592                    .introspection_source_indexes
593                    .remove(&introspection_source_index.item_id)
594                {
595                    // This should only happen during startup as a result of builtin migrations. We
596                    // create a new index item and replace the old one with it.
597                    let (index_name, index) = self.create_introspection_source_index(
598                        introspection_source_index.cluster_id,
599                        log,
600                        introspection_source_index.index_id,
601                    );
602                    assert_eq!(entry.id, introspection_source_index.item_id);
603                    assert_eq!(entry.oid, introspection_source_index.oid);
604                    assert_eq!(entry.name, index_name);
605                    entry.item = index;
606                    self.insert_entry(entry);
607                } else {
608                    self.insert_introspection_source_index(
609                        introspection_source_index.cluster_id,
610                        log,
611                        introspection_source_index.item_id,
612                        introspection_source_index.index_id,
613                        introspection_source_index.oid,
614                    );
615                }
616            }
617            StateDiff::Retraction => {
618                let entry = self.drop_item(introspection_source_index.item_id);
619                retractions
620                    .introspection_source_indexes
621                    .insert(entry.id, entry);
622            }
623        }
624    }
625
626    #[instrument(level = "debug")]
627    fn apply_cluster_replica_update(
628        &mut self,
629        cluster_replica: mz_catalog::durable::ClusterReplica,
630        diff: StateDiff,
631        _retractions: &mut InProgressRetractions,
632    ) {
633        let cluster = self
634            .clusters_by_id
635            .get(&cluster_replica.cluster_id)
636            .expect("catalog out of sync");
637        let azs = cluster.availability_zones();
638        let location = self
639            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
640            .expect("catalog in unexpected state");
641        let cluster = self
642            .clusters_by_id
643            .get_mut(&cluster_replica.cluster_id)
644            .expect("catalog out of sync");
645        apply_inverted_lookup(
646            &mut cluster.replica_id_by_name_,
647            &cluster_replica.name,
648            cluster_replica.replica_id,
649            diff,
650        );
651        match diff {
652            StateDiff::Retraction => {
653                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
654                assert!(
655                    prev.is_some(),
656                    "retraction does not match existing value: {:?}",
657                    cluster_replica.replica_id
658                );
659            }
660            StateDiff::Addition => {
661                let logging = ReplicaLogging {
662                    log_logging: cluster_replica.config.logging.log_logging,
663                    interval: cluster_replica.config.logging.interval,
664                };
665                let config = ReplicaConfig {
666                    location,
667                    compute: ComputeReplicaConfig { logging },
668                };
669                let mem_cluster_replica = ClusterReplica {
670                    name: cluster_replica.name.clone(),
671                    cluster_id: cluster_replica.cluster_id,
672                    replica_id: cluster_replica.replica_id,
673                    config,
674                    owner_id: cluster_replica.owner_id,
675                };
676                let prev = cluster
677                    .replicas_by_id_
678                    .insert(cluster_replica.replica_id, mem_cluster_replica);
679                assert_eq!(
680                    prev, None,
681                    "values must be explicitly retracted before inserting a new value: {:?}",
682                    cluster_replica.replica_id
683                );
684            }
685        }
686    }
687
688    #[instrument(level = "debug")]
689    fn apply_system_object_mapping_update(
690        &mut self,
691        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
692        diff: StateDiff,
693        retractions: &mut InProgressRetractions,
694        local_expression_cache: &mut LocalExpressionCache,
695    ) {
696        let item_id = system_object_mapping.unique_identifier.catalog_id;
697        let global_id = system_object_mapping.unique_identifier.global_id;
698
699        if system_object_mapping.unique_identifier.runtime_alterable() {
700            // Runtime-alterable system objects have real entries in the items
701            // collection and so get handled through the normal `insert_item`
702            // and `drop_item` code paths.
703            return;
704        }
705
706        if let StateDiff::Retraction = diff {
707            let entry = self.drop_item(item_id);
708            retractions.system_object_mappings.insert(item_id, entry);
709            return;
710        }
711
712        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
713            // This implies that we updated the fingerprint for some builtin item. The retraction
714            // was parsed, planned, and optimized using the compiled in definition, not the
715            // definition from a previous version. So we can just stick the old entry back into the
716            // catalog.
717            self.insert_entry(entry);
718            return;
719        }
720
721        let builtin = BUILTIN_LOOKUP
722            .get(&system_object_mapping.description)
723            .expect("missing builtin")
724            .1;
725        let schema_name = builtin.schema();
726        let schema_id = self
727            .ambient_schemas_by_name
728            .get(schema_name)
729            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
730        let name = QualifiedItemName {
731            qualifiers: ItemQualifiers {
732                database_spec: ResolvedDatabaseSpecifier::Ambient,
733                schema_spec: SchemaSpecifier::Id(*schema_id),
734            },
735            item: builtin.name().into(),
736        };
737        match builtin {
738            Builtin::Log(log) => {
739                let mut acl_items = vec![rbac::owner_privilege(
740                    mz_sql::catalog::ObjectType::Source,
741                    MZ_SYSTEM_ROLE_ID,
742                )];
743                acl_items.extend_from_slice(&log.access);
744                self.insert_item(
745                    item_id,
746                    log.oid,
747                    name.clone(),
748                    CatalogItem::Log(Log {
749                        variant: log.variant,
750                        global_id,
751                    }),
752                    MZ_SYSTEM_ROLE_ID,
753                    PrivilegeMap::from_mz_acl_items(acl_items),
754                );
755            }
756
757            Builtin::Table(table) => {
758                let mut acl_items = vec![rbac::owner_privilege(
759                    mz_sql::catalog::ObjectType::Table,
760                    MZ_SYSTEM_ROLE_ID,
761                )];
762                acl_items.extend_from_slice(&table.access);
763
764                self.insert_item(
765                    item_id,
766                    table.oid,
767                    name.clone(),
768                    CatalogItem::Table(Table {
769                        create_sql: None,
770                        desc: VersionedRelationDesc::new(table.desc.clone()),
771                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
772                        conn_id: None,
773                        resolved_ids: ResolvedIds::empty(),
774                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
775                            || {
776                                self.system_config()
777                                    .metrics_retention()
778                                    .try_into()
779                                    .expect("invalid metrics retention")
780                            },
781                        ),
782                        is_retained_metrics_object: table.is_retained_metrics_object,
783                        data_source: TableDataSource::TableWrites {
784                            defaults: vec![Expr::null(); table.desc.arity()],
785                        },
786                    }),
787                    MZ_SYSTEM_ROLE_ID,
788                    PrivilegeMap::from_mz_acl_items(acl_items),
789                );
790            }
791            Builtin::Index(index) => {
792                let custom_logical_compaction_window =
793                    index.is_retained_metrics_object.then(|| {
794                        self.system_config()
795                            .metrics_retention()
796                            .try_into()
797                            .expect("invalid metrics retention")
798                    });
799                // Indexes can't be versioned.
800                let versions = BTreeMap::new();
801
802                let item = self
803                    .parse_item(
804                        global_id,
805                        &index.create_sql(),
806                        &versions,
807                        None,
808                        index.is_retained_metrics_object,
809                        custom_logical_compaction_window,
810                        local_expression_cache,
811                        None,
812                    )
813                    .unwrap_or_else(|e| {
814                        panic!(
815                            "internal error: failed to load bootstrap index:\n\
816                                    {}\n\
817                                    error:\n\
818                                    {:?}\n\n\
819                                    make sure that the schema name is specified in the builtin index's create sql statement.",
820                            index.name, e
821                        )
822                    });
823                let CatalogItem::Index(_) = item else {
824                    panic!(
825                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
826                        index.name
827                    );
828                };
829
830                self.insert_item(
831                    item_id,
832                    index.oid,
833                    name,
834                    item,
835                    MZ_SYSTEM_ROLE_ID,
836                    PrivilegeMap::default(),
837                );
838            }
839            Builtin::View(_) => {
840                // parse_views is responsible for inserting all builtin views.
841                unreachable!("views added elsewhere");
842            }
843
844            // Note: Element types must be loaded before array types.
845            Builtin::Type(typ) => {
846                let typ = self.resolve_builtin_type_references(typ);
847                if let CatalogType::Array { element_reference } = typ.details.typ {
848                    let entry = self.get_entry_mut(&element_reference);
849                    let item_type = match &mut entry.item {
850                        CatalogItem::Type(item_type) => item_type,
851                        _ => unreachable!("types can only reference other types"),
852                    };
853                    item_type.details.array_id = Some(item_id);
854                }
855
856                let schema_id = self.resolve_system_schema(typ.schema);
857
858                self.insert_item(
859                    item_id,
860                    typ.oid,
861                    QualifiedItemName {
862                        qualifiers: ItemQualifiers {
863                            database_spec: ResolvedDatabaseSpecifier::Ambient,
864                            schema_spec: SchemaSpecifier::Id(schema_id),
865                        },
866                        item: typ.name.to_owned(),
867                    },
868                    CatalogItem::Type(Type {
869                        create_sql: None,
870                        global_id,
871                        details: typ.details.clone(),
872                        resolved_ids: ResolvedIds::empty(),
873                    }),
874                    MZ_SYSTEM_ROLE_ID,
875                    PrivilegeMap::from_mz_acl_items(vec![
876                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
877                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
878                    ]),
879                );
880            }
881
882            Builtin::Func(func) => {
883                // This OID is never used. `func` has a `Vec` of implementations and
884                // each implementation has its own OID. Those are the OIDs that are
885                // actually used by the system.
886                let oid = INVALID_OID;
887                self.insert_item(
888                    item_id,
889                    oid,
890                    name.clone(),
891                    CatalogItem::Func(Func {
892                        inner: func.inner,
893                        global_id,
894                    }),
895                    MZ_SYSTEM_ROLE_ID,
896                    PrivilegeMap::default(),
897                );
898            }
899
900            Builtin::Source(coll) => {
901                let mut acl_items = vec![rbac::owner_privilege(
902                    mz_sql::catalog::ObjectType::Source,
903                    MZ_SYSTEM_ROLE_ID,
904                )];
905                acl_items.extend_from_slice(&coll.access);
906
907                self.insert_item(
908                    item_id,
909                    coll.oid,
910                    name.clone(),
911                    CatalogItem::Source(Source {
912                        create_sql: None,
913                        data_source: DataSourceDesc::Introspection(coll.data_source),
914                        desc: coll.desc.clone(),
915                        global_id,
916                        timeline: Timeline::EpochMilliseconds,
917                        resolved_ids: ResolvedIds::empty(),
918                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
919                            || {
920                                self.system_config()
921                                    .metrics_retention()
922                                    .try_into()
923                                    .expect("invalid metrics retention")
924                            },
925                        ),
926                        is_retained_metrics_object: coll.is_retained_metrics_object,
927                    }),
928                    MZ_SYSTEM_ROLE_ID,
929                    PrivilegeMap::from_mz_acl_items(acl_items),
930                );
931            }
932            Builtin::ContinualTask(ct) => {
933                let mut acl_items = vec![rbac::owner_privilege(
934                    mz_sql::catalog::ObjectType::Source,
935                    MZ_SYSTEM_ROLE_ID,
936                )];
937                acl_items.extend_from_slice(&ct.access);
938                // Continual Tasks can't be versioned.
939                let versions = BTreeMap::new();
940
941                let item = self
942                    .parse_item(
943                        global_id,
944                        &ct.create_sql(),
945                        &versions,
946                        None,
947                        false,
948                        None,
949                        local_expression_cache,
950                        None,
951                    )
952                    .unwrap_or_else(|e| {
953                        panic!(
954                            "internal error: failed to load bootstrap continual task:\n\
955                                    {}\n\
956                                    error:\n\
957                                    {:?}\n\n\
958                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
959                            ct.name, e
960                        )
961                    });
962                let CatalogItem::ContinualTask(_) = &item else {
963                    panic!(
964                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
965                        ct.name
966                    );
967                };
968
969                self.insert_item(
970                    item_id,
971                    ct.oid,
972                    name,
973                    item,
974                    MZ_SYSTEM_ROLE_ID,
975                    PrivilegeMap::from_mz_acl_items(acl_items),
976                );
977            }
978            Builtin::Connection(connection) => {
979                // Connections can't be versioned.
980                let versions = BTreeMap::new();
981                let mut item = self
982                    .parse_item(
983                        global_id,
984                        connection.sql,
985                        &versions,
986                        None,
987                        false,
988                        None,
989                        local_expression_cache,
990                        None,
991                    )
992                    .unwrap_or_else(|e| {
993                        panic!(
994                            "internal error: failed to load bootstrap connection:\n\
995                                    {}\n\
996                                    error:\n\
997                                    {:?}\n\n\
998                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
999                            connection.name, e
1000                        )
1001                    });
1002                let CatalogItem::Connection(_) = &mut item else {
1003                    panic!(
1004                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1005                        connection.name
1006                    );
1007                };
1008
1009                let mut acl_items = vec![rbac::owner_privilege(
1010                    mz_sql::catalog::ObjectType::Connection,
1011                    connection.owner_id.clone(),
1012                )];
1013                acl_items.extend_from_slice(connection.access);
1014
1015                self.insert_item(
1016                    item_id,
1017                    connection.oid,
1018                    name.clone(),
1019                    item,
1020                    connection.owner_id.clone(),
1021                    PrivilegeMap::from_mz_acl_items(acl_items),
1022                );
1023            }
1024        }
1025    }
1026
1027    #[instrument(level = "debug")]
1028    fn apply_temporary_item_update(
1029        &mut self,
1030        temporary_item: TemporaryItem,
1031        diff: StateDiff,
1032        retractions: &mut InProgressRetractions,
1033        local_expression_cache: &mut LocalExpressionCache,
1034    ) {
1035        match diff {
1036            StateDiff::Addition => {
1037                let TemporaryItem {
1038                    id,
1039                    oid,
1040                    global_id,
1041                    schema_id,
1042                    name,
1043                    conn_id,
1044                    create_sql,
1045                    owner_id,
1046                    privileges,
1047                    extra_versions,
1048                } = temporary_item;
1049                let schema = self.find_temp_schema(&schema_id);
1050                let name = QualifiedItemName {
1051                    qualifiers: ItemQualifiers {
1052                        database_spec: schema.database().clone(),
1053                        schema_spec: schema.id().clone(),
1054                    },
1055                    item: name.clone(),
1056                };
1057
1058                let entry = match retractions.temp_items.remove(&id) {
1059                    Some(mut retraction) => {
1060                        assert_eq!(retraction.id, id);
1061
1062                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1063                        // item. This is a performance optimization and not needed for correctness.
1064                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1065                        // is still the same as the trait.
1066                        if retraction.create_sql() != create_sql {
1067                            let mut catalog_item = self
1068                                .deserialize_item(
1069                                    global_id,
1070                                    &create_sql,
1071                                    &extra_versions,
1072                                    local_expression_cache,
1073                                    Some(retraction.item),
1074                                )
1075                                .unwrap_or_else(|e| {
1076                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1077                                });
1078                            // Have to patch up the item because parsing doesn't
1079                            // take into account temporary schemas/conn_id.
1080                            // NOTE(aljoscha): I don't like how we're patching
1081                            // this in here, but it's but one of the ways in
1082                            // which temporary items are a bit weird. So, here
1083                            // we are ...
1084                            catalog_item.set_conn_id(conn_id);
1085                            retraction.item = catalog_item;
1086                        }
1087
1088                        retraction.id = id;
1089                        retraction.oid = oid;
1090                        retraction.name = name;
1091                        retraction.owner_id = owner_id;
1092                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1093                        retraction
1094                    }
1095                    None => {
1096                        let mut catalog_item = self
1097                            .deserialize_item(
1098                                global_id,
1099                                &create_sql,
1100                                &extra_versions,
1101                                local_expression_cache,
1102                                None,
1103                            )
1104                            .unwrap_or_else(|e| {
1105                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1106                            });
1107
1108                        // Have to patch up the item because parsing doesn't
1109                        // take into account temporary schemas/conn_id.
1110                        // NOTE(aljoscha): I don't like how we're patching this
1111                        // in here, but it's but one of the ways in which
1112                        // temporary items are a bit weird. So, here we are ...
1113                        catalog_item.set_conn_id(conn_id);
1114
1115                        CatalogEntry {
1116                            item: catalog_item,
1117                            referenced_by: Vec::new(),
1118                            used_by: Vec::new(),
1119                            id,
1120                            oid,
1121                            name,
1122                            owner_id,
1123                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1124                        }
1125                    }
1126                };
1127                self.insert_entry(entry);
1128            }
1129            StateDiff::Retraction => {
1130                let entry = self.drop_item(temporary_item.id);
1131                retractions.temp_items.insert(temporary_item.id, entry);
1132            }
1133        }
1134    }
1135
1136    #[instrument(level = "debug")]
1137    fn apply_item_update(
1138        &mut self,
1139        item: mz_catalog::durable::Item,
1140        diff: StateDiff,
1141        retractions: &mut InProgressRetractions,
1142        local_expression_cache: &mut LocalExpressionCache,
1143    ) -> Result<(), CatalogError> {
1144        match diff {
1145            StateDiff::Addition => {
1146                let key = item.key();
1147                let mz_catalog::durable::Item {
1148                    id,
1149                    oid,
1150                    global_id,
1151                    schema_id,
1152                    name,
1153                    create_sql,
1154                    owner_id,
1155                    privileges,
1156                    extra_versions,
1157                } = item;
1158                let schema = self.find_non_temp_schema(&schema_id);
1159                let name = QualifiedItemName {
1160                    qualifiers: ItemQualifiers {
1161                        database_spec: schema.database().clone(),
1162                        schema_spec: schema.id().clone(),
1163                    },
1164                    item: name.clone(),
1165                };
1166                let entry = match retractions.items.remove(&key) {
1167                    Some(retraction) => {
1168                        assert_eq!(retraction.id, item.id);
1169
1170                        let item = self
1171                            .deserialize_item(
1172                                global_id,
1173                                &create_sql,
1174                                &extra_versions,
1175                                local_expression_cache,
1176                                Some(retraction.item),
1177                            )
1178                            .unwrap_or_else(|e| {
1179                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1180                            });
1181
1182                        CatalogEntry {
1183                            item,
1184                            id,
1185                            oid,
1186                            name,
1187                            owner_id,
1188                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1189                            referenced_by: retraction.referenced_by,
1190                            used_by: retraction.used_by,
1191                        }
1192                    }
1193                    None => {
1194                        let catalog_item = self
1195                            .deserialize_item(
1196                                global_id,
1197                                &create_sql,
1198                                &extra_versions,
1199                                local_expression_cache,
1200                                None,
1201                            )
1202                            .unwrap_or_else(|e| {
1203                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1204                            });
1205                        CatalogEntry {
1206                            item: catalog_item,
1207                            referenced_by: Vec::new(),
1208                            used_by: Vec::new(),
1209                            id,
1210                            oid,
1211                            name,
1212                            owner_id,
1213                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1214                        }
1215                    }
1216                };
1217
1218                self.insert_entry(entry);
1219            }
1220            StateDiff::Retraction => {
1221                let entry = self.drop_item(item.id);
1222                let key = item.into_key_value().0;
1223                retractions.items.insert(key, entry);
1224            }
1225        }
1226        Ok(())
1227    }
1228
1229    #[instrument(level = "debug")]
1230    fn apply_comment_update(
1231        &mut self,
1232        comment: mz_catalog::durable::Comment,
1233        diff: StateDiff,
1234        _retractions: &mut InProgressRetractions,
1235    ) {
1236        match diff {
1237            StateDiff::Addition => {
1238                let prev = self.comments.update_comment(
1239                    comment.object_id,
1240                    comment.sub_component,
1241                    Some(comment.comment),
1242                );
1243                assert_eq!(
1244                    prev, None,
1245                    "values must be explicitly retracted before inserting a new value"
1246                );
1247            }
1248            StateDiff::Retraction => {
1249                let prev =
1250                    self.comments
1251                        .update_comment(comment.object_id, comment.sub_component, None);
1252                assert_eq!(
1253                    prev,
1254                    Some(comment.comment),
1255                    "retraction does not match existing value: ({:?}, {:?})",
1256                    comment.object_id,
1257                    comment.sub_component,
1258                );
1259            }
1260        }
1261    }
1262
1263    #[instrument(level = "debug")]
1264    fn apply_source_references_update(
1265        &mut self,
1266        source_references: mz_catalog::durable::SourceReferences,
1267        diff: StateDiff,
1268        _retractions: &mut InProgressRetractions,
1269    ) {
1270        match diff {
1271            StateDiff::Addition => {
1272                let prev = self
1273                    .source_references
1274                    .insert(source_references.source_id, source_references.into());
1275                assert!(
1276                    prev.is_none(),
1277                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1278                );
1279            }
1280            StateDiff::Retraction => {
1281                let prev = self.source_references.remove(&source_references.source_id);
1282                assert!(
1283                    prev.is_some(),
1284                    "retraction for a non-existent existing value: {source_references:?}"
1285                );
1286            }
1287        }
1288    }
1289
1290    #[instrument(level = "debug")]
1291    fn apply_storage_collection_metadata_update(
1292        &mut self,
1293        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1294        diff: StateDiff,
1295        _retractions: &mut InProgressRetractions,
1296    ) {
1297        apply_inverted_lookup(
1298            &mut self.storage_metadata.collection_metadata,
1299            &storage_collection_metadata.id,
1300            storage_collection_metadata.shard,
1301            diff,
1302        );
1303    }
1304
1305    #[instrument(level = "debug")]
1306    fn apply_unfinalized_shard_update(
1307        &mut self,
1308        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1309        diff: StateDiff,
1310        _retractions: &mut InProgressRetractions,
1311    ) {
1312        match diff {
1313            StateDiff::Addition => {
1314                let newly_inserted = self
1315                    .storage_metadata
1316                    .unfinalized_shards
1317                    .insert(unfinalized_shard.shard);
1318                assert!(
1319                    newly_inserted,
1320                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1321                );
1322            }
1323            StateDiff::Retraction => {
1324                let removed = self
1325                    .storage_metadata
1326                    .unfinalized_shards
1327                    .remove(&unfinalized_shard.shard);
1328                assert!(
1329                    removed,
1330                    "retraction does not match existing value: {unfinalized_shard:?}"
1331                );
1332            }
1333        }
1334    }
1335
1336    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1337    /// durable catalog.
1338    #[instrument]
1339    pub(crate) fn generate_builtin_table_updates(
1340        &self,
1341        updates: Vec<StateUpdate>,
1342    ) -> Vec<BuiltinTableUpdate> {
1343        let mut builtin_table_updates = Vec::new();
1344        for StateUpdate { kind, ts: _, diff } in updates {
1345            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1346            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1347            builtin_table_updates.extend(builtin_table_update);
1348        }
1349        builtin_table_updates
1350    }
1351
1352    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1353    /// durable catalog.
1354    #[instrument(level = "debug")]
1355    pub(crate) fn generate_builtin_table_update(
1356        &self,
1357        kind: StateUpdateKind,
1358        diff: StateDiff,
1359    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1360        let diff = diff.into();
1361        match kind {
1362            StateUpdateKind::Role(role) => {
1363                let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1364                for group_id in role.membership.map.keys() {
1365                    builtin_table_updates
1366                        .push(self.pack_role_members_update(*group_id, role.id, diff))
1367                }
1368                builtin_table_updates
1369            }
1370            StateUpdateKind::RoleAuth(role_auth) => {
1371                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1372            }
1373            StateUpdateKind::Database(database) => {
1374                vec![self.pack_database_update(&database.id, diff)]
1375            }
1376            StateUpdateKind::Schema(schema) => {
1377                let db_spec = schema.database_id.into();
1378                vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1379            }
1380            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1381                vec![self.pack_default_privileges_update(
1382                    &default_privilege.object,
1383                    &default_privilege.acl_item.grantee,
1384                    &default_privilege.acl_item.acl_mode,
1385                    diff,
1386                )]
1387            }
1388            StateUpdateKind::SystemPrivilege(system_privilege) => {
1389                vec![self.pack_system_privileges_update(system_privilege, diff)]
1390            }
1391            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1392            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1393            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1394                self.pack_item_update(introspection_source_index.item_id, diff)
1395            }
1396            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1397                cluster_replica.cluster_id,
1398                &cluster_replica.name,
1399                diff,
1400            ),
1401            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1402                // Runtime-alterable system objects have real entries in the
1403                // items collection and so get handled through the normal
1404                // `StateUpdateKind::Item`.`
1405                if !system_object_mapping.unique_identifier.runtime_alterable() {
1406                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1407                } else {
1408                    vec![]
1409                }
1410            }
1411            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1412            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1413            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1414                comment.object_id,
1415                comment.sub_component,
1416                &comment.comment,
1417                diff,
1418            )],
1419            StateUpdateKind::SourceReferences(source_references) => {
1420                self.pack_source_references_update(&source_references, diff)
1421            }
1422            StateUpdateKind::AuditLog(audit_log) => {
1423                vec![
1424                    self.pack_audit_log_update(&audit_log.event, diff)
1425                        .expect("could not pack audit log update"),
1426                ]
1427            }
1428            StateUpdateKind::NetworkPolicy(policy) => self
1429                .pack_network_policy_update(&policy.id, diff)
1430                .expect("could not pack audit log update"),
1431            StateUpdateKind::StorageCollectionMetadata(_)
1432            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1433        }
1434    }
1435
1436    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1437        self.entry_by_id
1438            .get_mut(id)
1439            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1440    }
1441
1442    fn get_schema_mut(
1443        &mut self,
1444        database_spec: &ResolvedDatabaseSpecifier,
1445        schema_spec: &SchemaSpecifier,
1446        conn_id: &ConnectionId,
1447    ) -> &mut Schema {
1448        // Keep in sync with `get_schemas`
1449        match (database_spec, schema_spec) {
1450            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1451                .temporary_schemas
1452                .get_mut(conn_id)
1453                .expect("catalog out of sync"),
1454            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1455                .ambient_schemas_by_id
1456                .get_mut(id)
1457                .expect("catalog out of sync"),
1458            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1459                .database_by_id
1460                .get_mut(database_id)
1461                .expect("catalog out of sync")
1462                .schemas_by_id
1463                .get_mut(schema_id)
1464                .expect("catalog out of sync"),
1465            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1466                unreachable!("temporary schemas are in the ambient database")
1467            }
1468        }
1469    }
1470
1471    /// Install builtin views to the catalog. This is its own function so that views can be
1472    /// optimized in parallel.
1473    ///
1474    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1475    /// problems by sniffing out specific errors and then retrying once those dependencies are
1476    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1477    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1478    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1479    /// in awaiting with nothing retriggering the attempt.
1480    #[instrument(name = "catalog::parse_views")]
1481    async fn parse_builtin_views(
1482        state: &mut CatalogState,
1483        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1484        retractions: &mut InProgressRetractions,
1485        local_expression_cache: &mut LocalExpressionCache,
1486    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1487        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1488        let (updates, additions): (Vec<_>, Vec<_>) =
1489            builtin_views
1490                .into_iter()
1491                .partition_map(|(view, item_id, gid)| {
1492                    match retractions.system_object_mappings.remove(&item_id) {
1493                        Some(entry) => Either::Left(entry),
1494                        None => Either::Right((view, item_id, gid)),
1495                    }
1496                });
1497
1498        for entry in updates {
1499            // This implies that we updated the fingerprint for some builtin view. The retraction
1500            // was parsed, planned, and optimized using the compiled in definition, not the
1501            // definition from a previous version. So we can just stick the old entry back into the
1502            // catalog.
1503            let item_id = entry.id();
1504            state.insert_entry(entry);
1505            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1506        }
1507
1508        let mut handles = Vec::new();
1509        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1510            BTreeMap::new();
1511        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1512        // Some errors are due to the implementation of casts or SQL functions that depend on some
1513        // view. Instead of figuring out the exact view dependency, delay these until the end.
1514        let mut awaiting_all = Vec::new();
1515        // Completed views, needed to avoid race conditions.
1516        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1517        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1518
1519        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1520        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1521            .into_iter()
1522            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1523            .collect();
1524        let item_ids: Vec<_> = views.keys().copied().collect();
1525
1526        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1527        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1528            if handles.is_empty() && ready.is_empty() {
1529                // Enqueue the views that were waiting for all the others.
1530                ready.extend(awaiting_all.drain(..));
1531            }
1532
1533            // Spawn tasks for all ready views.
1534            if !ready.is_empty() {
1535                let spawn_state = Arc::new(state.clone());
1536                while let Some(id) = ready.pop_front() {
1537                    let (view, global_id) = views.get(&id).expect("must exist");
1538                    let global_id = *global_id;
1539                    let create_sql = view.create_sql();
1540                    // Views can't be versioned.
1541                    let versions = BTreeMap::new();
1542
1543                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1544                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1545                    let task_state = Arc::clone(&spawn_state);
1546                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1547                    let handle = mz_ore::task::spawn(
1548                        || "parse view",
1549                        async move {
1550                            let res = task_state.parse_item_inner(
1551                                global_id,
1552                                &create_sql,
1553                                &versions,
1554                                None,
1555                                false,
1556                                None,
1557                                cached_expr,
1558                                None,
1559                            );
1560                            (id, global_id, res)
1561                        }
1562                        .instrument(span),
1563                    );
1564                    handles.push(handle);
1565                }
1566            }
1567
1568            // Wait for a view to be ready.
1569            let (selected, _idx, remaining) = future::select_all(handles).await;
1570            handles = remaining;
1571            let (id, global_id, res) = selected;
1572            let mut insert_cached_expr = |cached_expr| {
1573                if let Some(cached_expr) = cached_expr {
1574                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1575                }
1576            };
1577            match res {
1578                Ok((item, uncached_expr)) => {
1579                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1580                        local_expression_cache.insert_uncached_expression(
1581                            global_id,
1582                            uncached_expr,
1583                            optimizer_features,
1584                        );
1585                    }
1586                    // Add item to catalog.
1587                    let (view, _gid) = views.remove(&id).expect("must exist");
1588                    let schema_id = state
1589                        .ambient_schemas_by_name
1590                        .get(view.schema)
1591                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1592                    let qname = QualifiedItemName {
1593                        qualifiers: ItemQualifiers {
1594                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1595                            schema_spec: SchemaSpecifier::Id(*schema_id),
1596                        },
1597                        item: view.name.into(),
1598                    };
1599                    let mut acl_items = vec![rbac::owner_privilege(
1600                        mz_sql::catalog::ObjectType::View,
1601                        MZ_SYSTEM_ROLE_ID,
1602                    )];
1603                    acl_items.extend_from_slice(&view.access);
1604
1605                    state.insert_item(
1606                        id,
1607                        view.oid,
1608                        qname,
1609                        item,
1610                        MZ_SYSTEM_ROLE_ID,
1611                        PrivilegeMap::from_mz_acl_items(acl_items),
1612                    );
1613
1614                    // Enqueue any items waiting on this dependency.
1615                    let mut resolved_dependent_items = Vec::new();
1616                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1617                        resolved_dependent_items.extend(dependent_items);
1618                    }
1619                    let entry = state.get_entry(&id);
1620                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1621                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1622                        resolved_dependent_items.extend(dependent_items);
1623                    }
1624                    ready.extend(resolved_dependent_items);
1625
1626                    completed_ids.insert(id);
1627                    completed_names.insert(full_name);
1628                }
1629                // If we were missing a dependency, wait for it to be added.
1630                Err((
1631                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1632                    cached_expr,
1633                )) => {
1634                    insert_cached_expr(cached_expr);
1635                    if completed_ids.contains(&missing_dep) {
1636                        ready.push_back(id);
1637                    } else {
1638                        awaiting_id_dependencies
1639                            .entry(missing_dep)
1640                            .or_default()
1641                            .push(id);
1642                    }
1643                }
1644                // If we were missing a dependency, wait for it to be added.
1645                Err((
1646                    AdapterError::PlanError(plan::PlanError::Catalog(
1647                        SqlCatalogError::UnknownItem(missing_dep),
1648                    )),
1649                    cached_expr,
1650                )) => {
1651                    insert_cached_expr(cached_expr);
1652                    match CatalogItemId::from_str(&missing_dep) {
1653                        Ok(missing_dep) => {
1654                            if completed_ids.contains(&missing_dep) {
1655                                ready.push_back(id);
1656                            } else {
1657                                awaiting_id_dependencies
1658                                    .entry(missing_dep)
1659                                    .or_default()
1660                                    .push(id);
1661                            }
1662                        }
1663                        Err(_) => {
1664                            if completed_names.contains(&missing_dep) {
1665                                ready.push_back(id);
1666                            } else {
1667                                awaiting_name_dependencies
1668                                    .entry(missing_dep)
1669                                    .or_default()
1670                                    .push(id);
1671                            }
1672                        }
1673                    }
1674                }
1675                Err((
1676                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1677                    cached_expr,
1678                )) => {
1679                    insert_cached_expr(cached_expr);
1680                    awaiting_all.push(id);
1681                }
1682                Err((e, _)) => {
1683                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1684                    panic!(
1685                        "internal error: failed to load bootstrap view:\n\
1686                            {name}\n\
1687                            error:\n\
1688                            {e:?}\n\n\
1689                            Make sure that the schema name is specified in the builtin view's create sql statement.
1690                            ",
1691                        name = bad_view.name,
1692                    )
1693                }
1694            }
1695        }
1696
1697        assert!(awaiting_id_dependencies.is_empty());
1698        assert!(
1699            awaiting_name_dependencies.is_empty(),
1700            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1701        );
1702        assert!(awaiting_all.is_empty());
1703        assert!(views.is_empty());
1704
1705        // Generate a builtin table update for all the new views.
1706        builtin_table_updates.extend(
1707            item_ids
1708                .into_iter()
1709                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1710        );
1711
1712        builtin_table_updates
1713    }
1714
1715    /// Associates a name, `CatalogItemId`, and entry.
1716    fn insert_entry(&mut self, entry: CatalogEntry) {
1717        if !entry.id.is_system() {
1718            if let Some(cluster_id) = entry.item.cluster_id() {
1719                self.clusters_by_id
1720                    .get_mut(&cluster_id)
1721                    .expect("catalog out of sync")
1722                    .bound_objects
1723                    .insert(entry.id);
1724            };
1725        }
1726
1727        for u in entry.references().items() {
1728            match self.entry_by_id.get_mut(u) {
1729                Some(metadata) => metadata.referenced_by.push(entry.id()),
1730                None => panic!(
1731                    "Catalog: missing dependent catalog item {} while installing {}",
1732                    &u,
1733                    self.resolve_full_name(entry.name(), entry.conn_id())
1734                ),
1735            }
1736        }
1737        for u in entry.uses() {
1738            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1739            // present.
1740            if u == entry.id() {
1741                continue;
1742            }
1743            match self.entry_by_id.get_mut(&u) {
1744                Some(metadata) => metadata.used_by.push(entry.id()),
1745                None => panic!(
1746                    "Catalog: missing dependent catalog item {} while installing {}",
1747                    &u,
1748                    self.resolve_full_name(entry.name(), entry.conn_id())
1749                ),
1750            }
1751        }
1752        for gid in entry.item.global_ids() {
1753            self.entry_by_global_id.insert(gid, entry.id());
1754        }
1755        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1756        let schema = self.get_schema_mut(
1757            &entry.name().qualifiers.database_spec,
1758            &entry.name().qualifiers.schema_spec,
1759            conn_id,
1760        );
1761
1762        let prev_id = match entry.item() {
1763            CatalogItem::Func(_) => schema
1764                .functions
1765                .insert(entry.name().item.clone(), entry.id()),
1766            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1767            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1768        };
1769
1770        assert!(
1771            prev_id.is_none(),
1772            "builtin name collision on {:?}",
1773            entry.name().item.clone()
1774        );
1775
1776        self.entry_by_id.insert(entry.id(), entry.clone());
1777    }
1778
1779    /// Associates a name, [`CatalogItemId`], and entry.
1780    fn insert_item(
1781        &mut self,
1782        id: CatalogItemId,
1783        oid: u32,
1784        name: QualifiedItemName,
1785        item: CatalogItem,
1786        owner_id: RoleId,
1787        privileges: PrivilegeMap,
1788    ) {
1789        let entry = CatalogEntry {
1790            item,
1791            name,
1792            id,
1793            oid,
1794            used_by: Vec::new(),
1795            referenced_by: Vec::new(),
1796            owner_id,
1797            privileges,
1798        };
1799
1800        self.insert_entry(entry);
1801    }
1802
1803    #[mz_ore::instrument(level = "trace")]
1804    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1805        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1806        for u in metadata.references().items() {
1807            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1808                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1809            }
1810        }
1811        for u in metadata.uses() {
1812            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1813                dep_metadata.used_by.retain(|u| *u != metadata.id())
1814            }
1815        }
1816        for gid in metadata.global_ids() {
1817            self.entry_by_global_id.remove(&gid);
1818        }
1819
1820        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1821        let schema = self.get_schema_mut(
1822            &metadata.name().qualifiers.database_spec,
1823            &metadata.name().qualifiers.schema_spec,
1824            conn_id,
1825        );
1826        if metadata.item_type() == CatalogItemType::Type {
1827            schema
1828                .types
1829                .remove(&metadata.name().item)
1830                .expect("catalog out of sync");
1831        } else {
1832            // Functions would need special handling, but we don't yet support
1833            // dropping functions.
1834            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1835
1836            schema
1837                .items
1838                .remove(&metadata.name().item)
1839                .expect("catalog out of sync");
1840        };
1841
1842        if !id.is_system() {
1843            if let Some(cluster_id) = metadata.item().cluster_id() {
1844                assert!(
1845                    self.clusters_by_id
1846                        .get_mut(&cluster_id)
1847                        .expect("catalog out of sync")
1848                        .bound_objects
1849                        .remove(&id),
1850                    "catalog out of sync"
1851                );
1852            }
1853        }
1854
1855        metadata
1856    }
1857
1858    fn insert_introspection_source_index(
1859        &mut self,
1860        cluster_id: ClusterId,
1861        log: &'static BuiltinLog,
1862        item_id: CatalogItemId,
1863        global_id: GlobalId,
1864        oid: u32,
1865    ) {
1866        let (index_name, index) =
1867            self.create_introspection_source_index(cluster_id, log, global_id);
1868        self.insert_item(
1869            item_id,
1870            oid,
1871            index_name,
1872            index,
1873            MZ_SYSTEM_ROLE_ID,
1874            PrivilegeMap::default(),
1875        );
1876    }
1877
1878    fn create_introspection_source_index(
1879        &self,
1880        cluster_id: ClusterId,
1881        log: &'static BuiltinLog,
1882        global_id: GlobalId,
1883    ) -> (QualifiedItemName, CatalogItem) {
1884        let source_name = FullItemName {
1885            database: RawDatabaseSpecifier::Ambient,
1886            schema: log.schema.into(),
1887            item: log.name.into(),
1888        };
1889        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1890        let mut index_name = QualifiedItemName {
1891            qualifiers: ItemQualifiers {
1892                database_spec: ResolvedDatabaseSpecifier::Ambient,
1893                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1894            },
1895            item: index_name.clone(),
1896        };
1897        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1898        let index_item_name = index_name.item.clone();
1899        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1900        let index = CatalogItem::Index(Index {
1901            global_id,
1902            on: log_global_id,
1903            keys: log
1904                .variant
1905                .index_by()
1906                .into_iter()
1907                .map(MirScalarExpr::column)
1908                .collect(),
1909            create_sql: index_sql(
1910                index_item_name,
1911                cluster_id,
1912                source_name,
1913                &log.variant.desc(),
1914                &log.variant.index_by(),
1915            ),
1916            conn_id: None,
1917            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1918            cluster_id,
1919            is_retained_metrics_object: false,
1920            custom_logical_compaction_window: None,
1921        });
1922        (index_name, index)
1923    }
1924
1925    /// Insert system configuration `name` with `value`.
1926    ///
1927    /// Return a `bool` value indicating whether the configuration was modified
1928    /// by the call.
1929    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1930        Ok(self.system_configuration.set(name, value)?)
1931    }
1932
1933    /// Reset system configuration `name`.
1934    ///
1935    /// Return a `bool` value indicating whether the configuration was modified
1936    /// by the call.
1937    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1938        Ok(self.system_configuration.reset(name)?)
1939    }
1940}
1941
1942/// Sort [`StateUpdate`]s in timestamp then dependency order
1943fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1944    let mut sorted_updates = Vec::with_capacity(updates.len());
1945
1946    updates.sort_by_key(|update| update.ts);
1947    for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
1948        let sorted_ts_updates = sort_updates_inner(updates.collect());
1949        sorted_updates.extend(sorted_ts_updates);
1950    }
1951
1952    sorted_updates
1953}
1954
1955/// Sort [`StateUpdate`]s in dependency order for a single timestamp.
1956fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1957    fn push_update<T>(
1958        update: T,
1959        diff: StateDiff,
1960        retractions: &mut Vec<T>,
1961        additions: &mut Vec<T>,
1962    ) {
1963        match diff {
1964            StateDiff::Retraction => retractions.push(update),
1965            StateDiff::Addition => additions.push(update),
1966        }
1967    }
1968
1969    soft_assert_no_log!(
1970        updates.iter().map(|update| update.ts).all_equal(),
1971        "all timestamps should be equal: {updates:?}"
1972    );
1973
1974    // Partition updates by type so that we can weave different update types into the right spots.
1975    let mut pre_cluster_retractions = Vec::new();
1976    let mut pre_cluster_additions = Vec::new();
1977    let mut cluster_retractions = Vec::new();
1978    let mut cluster_additions = Vec::new();
1979    let mut builtin_item_updates = Vec::new();
1980    let mut item_retractions = Vec::new();
1981    let mut item_additions = Vec::new();
1982    let mut temp_item_retractions = Vec::new();
1983    let mut temp_item_additions = Vec::new();
1984    let mut post_item_retractions = Vec::new();
1985    let mut post_item_additions = Vec::new();
1986    for update in updates {
1987        let diff = update.diff.clone();
1988        match update.kind {
1989            StateUpdateKind::Role(_)
1990            | StateUpdateKind::RoleAuth(_)
1991            | StateUpdateKind::Database(_)
1992            | StateUpdateKind::Schema(_)
1993            | StateUpdateKind::DefaultPrivilege(_)
1994            | StateUpdateKind::SystemPrivilege(_)
1995            | StateUpdateKind::SystemConfiguration(_)
1996            | StateUpdateKind::NetworkPolicy(_) => push_update(
1997                update,
1998                diff,
1999                &mut pre_cluster_retractions,
2000                &mut pre_cluster_additions,
2001            ),
2002            StateUpdateKind::Cluster(_)
2003            | StateUpdateKind::IntrospectionSourceIndex(_)
2004            | StateUpdateKind::ClusterReplica(_) => push_update(
2005                update,
2006                diff,
2007                &mut cluster_retractions,
2008                &mut cluster_additions,
2009            ),
2010            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2011                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2012            }
2013            StateUpdateKind::TemporaryItem(item) => push_update(
2014                (item, update.ts, update.diff),
2015                diff,
2016                &mut temp_item_retractions,
2017                &mut temp_item_additions,
2018            ),
2019            StateUpdateKind::Item(item) => push_update(
2020                (item, update.ts, update.diff),
2021                diff,
2022                &mut item_retractions,
2023                &mut item_additions,
2024            ),
2025            StateUpdateKind::Comment(_)
2026            | StateUpdateKind::SourceReferences(_)
2027            | StateUpdateKind::AuditLog(_)
2028            | StateUpdateKind::StorageCollectionMetadata(_)
2029            | StateUpdateKind::UnfinalizedShard(_) => push_update(
2030                update,
2031                diff,
2032                &mut post_item_retractions,
2033                &mut post_item_additions,
2034            ),
2035        }
2036    }
2037
2038    // Sort builtin item updates by dependency.
2039    let builtin_item_updates = builtin_item_updates
2040        .into_iter()
2041        .map(|(system_object_mapping, ts, diff)| {
2042            let idx = BUILTIN_LOOKUP
2043                .get(&system_object_mapping.description)
2044                .expect("missing builtin")
2045                .0;
2046            (idx, system_object_mapping, ts, diff)
2047        })
2048        .sorted_by_key(|(idx, _, _, _)| *idx)
2049        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2050
2051    // Further partition builtin item updates.
2052    let mut other_builtin_retractions = Vec::new();
2053    let mut other_builtin_additions = Vec::new();
2054    let mut builtin_index_retractions = Vec::new();
2055    let mut builtin_index_additions = Vec::new();
2056    for (builtin_item_update, ts, diff) in builtin_item_updates {
2057        match &builtin_item_update.description.object_type {
2058            CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
2059                StateUpdate {
2060                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2061                    ts,
2062                    diff,
2063                },
2064                diff,
2065                &mut builtin_index_retractions,
2066                &mut builtin_index_additions,
2067            ),
2068            CatalogItemType::Table
2069            | CatalogItemType::Source
2070            | CatalogItemType::Sink
2071            | CatalogItemType::View
2072            | CatalogItemType::MaterializedView
2073            | CatalogItemType::Type
2074            | CatalogItemType::Func
2075            | CatalogItemType::Secret
2076            | CatalogItemType::Connection => push_update(
2077                StateUpdate {
2078                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2079                    ts,
2080                    diff,
2081                },
2082                diff,
2083                &mut other_builtin_retractions,
2084                &mut other_builtin_additions,
2085            ),
2086        }
2087    }
2088
2089    /// Sort `CONNECTION` items.
2090    ///
2091    /// `CONNECTION`s can depend on one another, e.g. a `KAFKA CONNECTION` contains
2092    /// a list of `BROKERS` and these broker definitions can themselves reference other
2093    /// connections. This `BROKERS` list can also be `ALTER`ed and thus it's possible
2094    /// for a connection to depend on another with an ID greater than its own.
2095    fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2096        let mut topo: BTreeMap<
2097            (mz_catalog::durable::Item, Timestamp, StateDiff),
2098            BTreeSet<CatalogItemId>,
2099        > = BTreeMap::default();
2100        let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
2101
2102        // Initialize our set of topological sort.
2103        tracing::debug!(?connections, "sorting connections");
2104        for (connection, ts, diff) in connections.drain(..) {
2105            let statement = mz_sql::parse::parse(&connection.create_sql)
2106                .expect("valid CONNECTION create_sql")
2107                .into_element()
2108                .ast;
2109            let mut dependencies = mz_sql::names::dependencies(&statement)
2110                .expect("failed to find dependencies of CONNECTION");
2111            // Be defensive and remove any possible self references.
2112            dependencies.remove(&connection.id);
2113            // It's possible we're applying updates to a connection where the
2114            // dependency already exists and thus it's not in `connections`.
2115            dependencies.retain(|dep| existing_connections.contains(dep));
2116
2117            // Be defensive and ensure we're not clobbering any items.
2118            assert_none!(topo.insert((connection, ts, diff), dependencies));
2119        }
2120        tracing::debug!(?topo, ?existing_connections, "built topological sort");
2121
2122        // Do a topological sort, pushing back into the provided Vec.
2123        while !topo.is_empty() {
2124            // Get all of the connections with no dependencies.
2125            let no_deps: Vec<_> = topo
2126                .iter()
2127                .filter_map(|(item, deps)| {
2128                    if deps.is_empty() {
2129                        Some(item.clone())
2130                    } else {
2131                        None
2132                    }
2133                })
2134                .collect();
2135
2136            // Cycle in our graph!
2137            if no_deps.is_empty() {
2138                panic!("programming error, cycle in Connections");
2139            }
2140
2141            // Process all of the items with no dependencies.
2142            for item in no_deps {
2143                // Remove the item from our topological sort.
2144                topo.remove(&item);
2145                // Remove this item from anything that depends on it.
2146                topo.values_mut().for_each(|deps| {
2147                    deps.remove(&item.0.id);
2148                });
2149                // Push it back into our list as "completed".
2150                connections.push(item);
2151            }
2152        }
2153    }
2154
2155    /// Sort item updates by dependency.
2156    ///
2157    /// First we group items into groups that are totally ordered by dependency. For example, when
2158    /// sorting all items by dependency we know that all tables can come after all sources, because
2159    /// a source can never depend on a table. Within these groups, the ID order matches the
2160    /// dependency order.
2161    ///
2162    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2163    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2164    /// approach would be to investigate each item, discover their exact dependencies, and then
2165    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2166    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2167    /// referred to by name.
2168    ///
2169    /// The logic of this function should match [`sort_temp_item_updates`].
2170    fn sort_item_updates(
2171        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2172    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2173        // Partition items into groups s.t. each item in one group has a predefined order with all
2174        // items in other groups. For example, all sinks are ordered greater than all tables.
2175        let mut types = Vec::new();
2176        // N.B. Functions can depend on system tables, but not user tables.
2177        // TODO(udf): This will change when UDFs are supported.
2178        let mut funcs = Vec::new();
2179        let mut secrets = Vec::new();
2180        let mut connections = Vec::new();
2181        let mut sources = Vec::new();
2182        let mut tables = Vec::new();
2183        let mut derived_items = Vec::new();
2184        let mut sinks = Vec::new();
2185        let mut continual_tasks = Vec::new();
2186
2187        for update in item_updates {
2188            match update.0.item_type() {
2189                CatalogItemType::Type => types.push(update),
2190                CatalogItemType::Func => funcs.push(update),
2191                CatalogItemType::Secret => secrets.push(update),
2192                CatalogItemType::Connection => connections.push(update),
2193                CatalogItemType::Source => sources.push(update),
2194                CatalogItemType::Table => tables.push(update),
2195                CatalogItemType::View
2196                | CatalogItemType::MaterializedView
2197                | CatalogItemType::Index => derived_items.push(update),
2198                CatalogItemType::Sink => sinks.push(update),
2199                CatalogItemType::ContinualTask => continual_tasks.push(update),
2200            }
2201        }
2202
2203        // Within each group, sort by ID.
2204        for group in [
2205            &mut types,
2206            &mut funcs,
2207            &mut secrets,
2208            &mut sources,
2209            &mut tables,
2210            &mut derived_items,
2211            &mut sinks,
2212            &mut continual_tasks,
2213        ] {
2214            group.sort_by_key(|(item, _, _)| item.id);
2215        }
2216
2217        // HACK(parkmycar): Connections are special and can depend on one another. Additionally
2218        // connections can be `ALTER`ed and thus a `CONNECTION` can depend on another whose ID
2219        // is greater than its own.
2220        sort_connections(&mut connections);
2221
2222        iter::empty()
2223            .chain(types)
2224            .chain(funcs)
2225            .chain(secrets)
2226            .chain(connections)
2227            .chain(sources)
2228            .chain(tables)
2229            .chain(derived_items)
2230            .chain(sinks)
2231            .chain(continual_tasks)
2232            .collect()
2233    }
2234
2235    let item_retractions = sort_item_updates(item_retractions);
2236    let item_additions = sort_item_updates(item_additions);
2237
2238    /// Sort temporary item updates by dependency.
2239    ///
2240    /// The logic of this function should match [`sort_item_updates`].
2241    fn sort_temp_item_updates(
2242        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2243    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2244        // Partition items into groups s.t. each item in one group has a predefined order with all
2245        // items in other groups. For example, all sinks are ordered greater than all tables.
2246        let mut types = Vec::new();
2247        // N.B. Functions can depend on system tables, but not user tables.
2248        let mut funcs = Vec::new();
2249        let mut secrets = Vec::new();
2250        let mut connections = Vec::new();
2251        let mut sources = Vec::new();
2252        let mut tables = Vec::new();
2253        let mut derived_items = Vec::new();
2254        let mut sinks = Vec::new();
2255        let mut continual_tasks = Vec::new();
2256
2257        for update in temp_item_updates {
2258            match update.0.item_type() {
2259                CatalogItemType::Type => types.push(update),
2260                CatalogItemType::Func => funcs.push(update),
2261                CatalogItemType::Secret => secrets.push(update),
2262                CatalogItemType::Connection => connections.push(update),
2263                CatalogItemType::Source => sources.push(update),
2264                CatalogItemType::Table => tables.push(update),
2265                CatalogItemType::View
2266                | CatalogItemType::MaterializedView
2267                | CatalogItemType::Index => derived_items.push(update),
2268                CatalogItemType::Sink => sinks.push(update),
2269                CatalogItemType::ContinualTask => continual_tasks.push(update),
2270            }
2271        }
2272
2273        // Within each group, sort by ID.
2274        for group in [
2275            &mut types,
2276            &mut funcs,
2277            &mut secrets,
2278            &mut connections,
2279            &mut sources,
2280            &mut tables,
2281            &mut derived_items,
2282            &mut sinks,
2283            &mut continual_tasks,
2284        ] {
2285            group.sort_by_key(|(item, _, _)| item.id);
2286        }
2287
2288        iter::empty()
2289            .chain(types)
2290            .chain(funcs)
2291            .chain(secrets)
2292            .chain(connections)
2293            .chain(sources)
2294            .chain(tables)
2295            .chain(derived_items)
2296            .chain(sinks)
2297            .chain(continual_tasks)
2298            .collect()
2299    }
2300    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2301    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2302
2303    /// Merge sorted temporary and non-temp items.
2304    fn merge_item_updates(
2305        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2306        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2307    ) -> Vec<StateUpdate> {
2308        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2309
2310        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2311            (item_updates.front(), temp_item_updates.front())
2312        {
2313            if item.id < temp_item.id {
2314                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2315                state_updates.push(StateUpdate {
2316                    kind: StateUpdateKind::Item(item),
2317                    ts,
2318                    diff,
2319                });
2320            } else if item.id > temp_item.id {
2321                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2322                state_updates.push(StateUpdate {
2323                    kind: StateUpdateKind::TemporaryItem(temp_item),
2324                    ts,
2325                    diff,
2326                });
2327            } else {
2328                unreachable!(
2329                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2330                );
2331            }
2332        }
2333
2334        while let Some((item, ts, diff)) = item_updates.pop_front() {
2335            state_updates.push(StateUpdate {
2336                kind: StateUpdateKind::Item(item),
2337                ts,
2338                diff,
2339            });
2340        }
2341
2342        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2343            state_updates.push(StateUpdate {
2344                kind: StateUpdateKind::TemporaryItem(temp_item),
2345                ts,
2346                diff,
2347            });
2348        }
2349
2350        state_updates
2351    }
2352    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2353    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2354
2355    // Put everything back together.
2356    iter::empty()
2357        // All retractions must be reversed.
2358        .chain(post_item_retractions.into_iter().rev())
2359        .chain(item_retractions.into_iter().rev())
2360        .chain(builtin_index_retractions.into_iter().rev())
2361        .chain(cluster_retractions.into_iter().rev())
2362        .chain(other_builtin_retractions.into_iter().rev())
2363        .chain(pre_cluster_retractions.into_iter().rev())
2364        .chain(pre_cluster_additions)
2365        .chain(other_builtin_additions)
2366        .chain(cluster_additions)
2367        .chain(builtin_index_additions)
2368        .chain(item_additions)
2369        .chain(post_item_additions)
2370        .collect()
2371}
2372
2373/// Most updates are applied one at a time, but during bootstrap, certain types are applied
2374/// separately in a batch for performance reasons. A constraint is that updates must be applied in
2375/// order. This process is modeled as a state machine that batches then applies groups of updates.
2376enum BootstrapApplyState {
2377    /// Additions of builtin views.
2378    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2379    /// Item updates that aren't builtin view additions.
2380    Items(Vec<StateUpdate>),
2381    /// All other updates.
2382    Updates(Vec<StateUpdate>),
2383}
2384
2385impl BootstrapApplyState {
2386    fn new(update: StateUpdate) -> BootstrapApplyState {
2387        match update {
2388            StateUpdate {
2389                kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2390                diff: StateDiff::Addition,
2391                ..
2392            } if matches!(
2393                system_object_mapping.description.object_type,
2394                CatalogItemType::View
2395            ) =>
2396            {
2397                let view_addition = lookup_builtin_view_addition(system_object_mapping);
2398                BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2399            }
2400            StateUpdate {
2401                kind: StateUpdateKind::IntrospectionSourceIndex(_),
2402                ..
2403            }
2404            | StateUpdate {
2405                kind: StateUpdateKind::SystemObjectMapping(_),
2406                ..
2407            }
2408            | StateUpdate {
2409                kind: StateUpdateKind::Item(_),
2410                ..
2411            } => BootstrapApplyState::Items(vec![update]),
2412            update => BootstrapApplyState::Updates(vec![update]),
2413        }
2414    }
2415
2416    /// Apply all updates that have been batched in `self`.
2417    ///
2418    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2419    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2420    /// details.
2421    async fn apply(
2422        self,
2423        state: &mut CatalogState,
2424        retractions: &mut InProgressRetractions,
2425        local_expression_cache: &mut LocalExpressionCache,
2426    ) -> (
2427        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2428        Vec<ParsedStateUpdate>,
2429    ) {
2430        match self {
2431            BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2432                let restore = state.system_configuration.clone();
2433                state.system_configuration.enable_for_item_parsing();
2434                let builtin_table_updates = CatalogState::parse_builtin_views(
2435                    state,
2436                    builtin_view_additions,
2437                    retractions,
2438                    local_expression_cache,
2439                )
2440                .await;
2441                state.system_configuration = restore;
2442                (builtin_table_updates, Vec::new())
2443            }
2444            BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2445                state
2446                    .apply_updates_inner(updates, retractions, local_expression_cache)
2447                    .expect("corrupt catalog")
2448            }),
2449            BootstrapApplyState::Updates(updates) => state
2450                .apply_updates_inner(updates, retractions, local_expression_cache)
2451                .expect("corrupt catalog"),
2452        }
2453    }
2454
2455    async fn step(
2456        self,
2457        next: BootstrapApplyState,
2458        state: &mut CatalogState,
2459        retractions: &mut InProgressRetractions,
2460        local_expression_cache: &mut LocalExpressionCache,
2461    ) -> (
2462        BootstrapApplyState,
2463        (
2464            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2465            Vec<ParsedStateUpdate>,
2466        ),
2467    ) {
2468        match (self, next) {
2469            (
2470                BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2471                BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2472            ) => {
2473                // Continue batching builtin view additions.
2474                builtin_view_additions.extend(next_builtin_view_additions);
2475                (
2476                    BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2477                    (Vec::new(), Vec::new()),
2478                )
2479            }
2480            (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2481                // Continue batching item updates.
2482                updates.extend(next_updates);
2483                (
2484                    BootstrapApplyState::Items(updates),
2485                    (Vec::new(), Vec::new()),
2486                )
2487            }
2488            (
2489                BootstrapApplyState::Updates(mut updates),
2490                BootstrapApplyState::Updates(next_updates),
2491            ) => {
2492                // Continue batching updates.
2493                updates.extend(next_updates);
2494                (
2495                    BootstrapApplyState::Updates(updates),
2496                    (Vec::new(), Vec::new()),
2497                )
2498            }
2499            (apply_state, next_apply_state) => {
2500                // Apply the current batch and start batching new apply state.
2501                let updates = apply_state
2502                    .apply(state, retractions, local_expression_cache)
2503                    .await;
2504                (next_apply_state, updates)
2505            }
2506        }
2507    }
2508}
2509
2510/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2511/// generally IDs.
2512///
2513/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2514fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2515where
2516    K: Ord + Clone + Debug,
2517    V: PartialEq + Debug,
2518{
2519    match diff {
2520        StateDiff::Retraction => {
2521            let prev = map.remove(key);
2522            assert_eq!(
2523                prev,
2524                Some(value),
2525                "retraction does not match existing value: {key:?}"
2526            );
2527        }
2528        StateDiff::Addition => {
2529            let prev = map.insert(key.clone(), value);
2530            assert_eq!(
2531                prev, None,
2532                "values must be explicitly retracted before inserting a new value: {key:?}"
2533            );
2534        }
2535    }
2536}
2537
2538/// Helper method to update catalog state, that may need to be updated from a previously retracted
2539/// object.
2540fn apply_with_update<K, V, D>(
2541    map: &mut BTreeMap<K, V>,
2542    durable: D,
2543    key_fn: impl FnOnce(&D) -> K,
2544    diff: StateDiff,
2545    retractions: &mut BTreeMap<D::Key, V>,
2546) where
2547    K: Ord,
2548    V: UpdateFrom<D> + PartialEq + Debug,
2549    D: DurableType,
2550    D::Key: Ord,
2551{
2552    match diff {
2553        StateDiff::Retraction => {
2554            let mem_key = key_fn(&durable);
2555            let value = map
2556                .remove(&mem_key)
2557                .expect("retraction does not match existing value: {key:?}");
2558            let durable_key = durable.into_key_value().0;
2559            retractions.insert(durable_key, value);
2560        }
2561        StateDiff::Addition => {
2562            let mem_key = key_fn(&durable);
2563            let durable_key = durable.key();
2564            let value = match retractions.remove(&durable_key) {
2565                Some(mut retraction) => {
2566                    retraction.update_from(durable);
2567                    retraction
2568                }
2569                None => durable.into(),
2570            };
2571            let prev = map.insert(mem_key, value);
2572            assert_eq!(
2573                prev, None,
2574                "values must be explicitly retracted before inserting a new value"
2575            );
2576        }
2577    }
2578}
2579
2580/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2581fn lookup_builtin_view_addition(
2582    mapping: SystemObjectMapping,
2583) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2584    let (_, builtin) = BUILTIN_LOOKUP
2585        .get(&mapping.description)
2586        .expect("missing builtin view");
2587    let Builtin::View(view) = builtin else {
2588        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2589    };
2590
2591    (
2592        view,
2593        mapping.unique_identifier.catalog_id,
2594        mapping.unique_identifier.global_id,
2595    )
2596}