mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::future;
20use itertools::{Either, Itertools};
21use mz_adapter_types::connection::ConnectionId;
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
25};
26use mz_catalog::durable::objects::{
27    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
28    SchemaKey,
29};
30use mz_catalog::durable::{CatalogError, SystemObjectMapping};
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
34    NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
35    TableDataSource, TemporaryItem, Type, UpdateFrom,
36};
37use mz_compute_types::config::ComputeReplicaConfig;
38use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
39use mz_controller_types::ClusterId;
40use mz_expr::MirScalarExpr;
41use mz_ore::collections::CollectionExt;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_ore::{assert_none, instrument, soft_assert_no_log};
44use mz_pgrepr::oid::INVALID_OID;
45use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
48use mz_sql::catalog::CatalogError as SqlCatalogError;
49use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
50use mz_sql::names::{
51    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
52    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
53};
54use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
55use mz_sql::session::vars::{VarError, VarInput};
56use mz_sql::{plan, rbac};
57use mz_sql_parser::ast::Expr;
58use mz_storage_types::sources::Timeline;
59use tracing::{Instrument, info_span, warn};
60
61use crate::AdapterError;
62use crate::catalog::state::LocalExpressionCache;
63use crate::catalog::{BuiltinTableUpdate, CatalogState};
64use crate::util::index_sql;
65
66/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
67/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
68/// results in applying a retraction for that object followed by applying an addition for that
69/// object. When applying those additions it can be extremely expensive to re-build that
70/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
71/// retractions, so it can be used during additions.
72///
73/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
74/// objects that maintain denormalized state.
75// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
76// the update step is a no-op for certain types.
77#[derive(Debug, Clone, Default)]
78struct InProgressRetractions {
79    roles: BTreeMap<RoleKey, Role>,
80    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
81    databases: BTreeMap<DatabaseKey, Database>,
82    schemas: BTreeMap<SchemaKey, Schema>,
83    clusters: BTreeMap<ClusterKey, Cluster>,
84    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
85    items: BTreeMap<ItemKey, CatalogEntry>,
86    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
87    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
88    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
89}
90
91impl CatalogState {
92    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
93    ///
94    /// Returns builtin table updates corresponding to the changes to catalog state.
95    ///
96    /// This is meant specifically for bootstrapping because it batches and applies builtin view
97    /// additions separately from other update types.
98    #[must_use]
99    #[instrument]
100    pub(crate) async fn apply_updates_for_bootstrap(
101        &mut self,
102        updates: Vec<StateUpdate>,
103        local_expression_cache: &mut LocalExpressionCache,
104    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
105        let mut builtin_table_updates = Vec::with_capacity(updates.len());
106        let updates = sort_updates(updates);
107
108        let mut groups: Vec<Vec<_>> = Vec::new();
109        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
110            groups.push(updates.collect());
111        }
112        for updates in groups {
113            let mut apply_state = BootstrapApplyState::Updates(Vec::new());
114            let mut retractions = InProgressRetractions::default();
115
116            for update in updates {
117                let next_apply_state = BootstrapApplyState::new(update);
118                let (next_apply_state, builtin_table_update) = apply_state
119                    .step(
120                        next_apply_state,
121                        self,
122                        &mut retractions,
123                        local_expression_cache,
124                    )
125                    .await;
126                apply_state = next_apply_state;
127                builtin_table_updates.extend(builtin_table_update);
128            }
129
130            // Apply remaining state.
131            let builtin_table_update = apply_state
132                .apply(self, &mut retractions, local_expression_cache)
133                .await;
134            builtin_table_updates.extend(builtin_table_update);
135        }
136        builtin_table_updates
137    }
138
139    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
140    ///
141    /// Returns builtin table updates corresponding to the changes to catalog state.
142    #[instrument]
143    pub(crate) fn apply_updates(
144        &mut self,
145        updates: Vec<StateUpdate>,
146    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
147        let mut builtin_table_updates = Vec::with_capacity(updates.len());
148        let updates = sort_updates(updates);
149
150        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
151            let mut retractions = InProgressRetractions::default();
152            let builtin_table_update = self.apply_updates_inner(
153                updates.collect(),
154                &mut retractions,
155                &mut LocalExpressionCache::Closed,
156            )?;
157            builtin_table_updates.extend(builtin_table_update);
158        }
159
160        Ok(builtin_table_updates)
161    }
162
163    #[instrument(level = "debug")]
164    fn apply_updates_inner(
165        &mut self,
166        updates: Vec<StateUpdate>,
167        retractions: &mut InProgressRetractions,
168        local_expression_cache: &mut LocalExpressionCache,
169    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
170        soft_assert_no_log!(
171            updates.iter().map(|update| update.ts).all_equal(),
172            "all timestamps should be equal: {updates:?}"
173        );
174
175        let mut update_system_config = false;
176
177        let mut builtin_table_updates = Vec::with_capacity(updates.len());
178        for StateUpdate { kind, ts: _, diff } in updates {
179            if matches!(kind, StateUpdateKind::SystemConfiguration(_)) {
180                update_system_config = true;
181            }
182
183            match diff {
184                StateDiff::Retraction => {
185                    // We want the builtin table retraction to match the state of the catalog
186                    // before applying the update.
187                    builtin_table_updates
188                        .extend(self.generate_builtin_table_update(kind.clone(), diff));
189                    self.apply_update(kind, diff, retractions, local_expression_cache)?;
190                }
191                StateDiff::Addition => {
192                    self.apply_update(kind.clone(), diff, retractions, local_expression_cache)?;
193                    // We want the builtin table addition to match the state of the catalog
194                    // after applying the update.
195                    builtin_table_updates
196                        .extend(self.generate_builtin_table_update(kind.clone(), diff));
197                }
198            }
199        }
200
201        if update_system_config {
202            self.system_configuration.dyncfg_updates();
203        }
204
205        Ok(builtin_table_updates)
206    }
207
208    #[instrument(level = "debug")]
209    fn apply_update(
210        &mut self,
211        kind: StateUpdateKind,
212        diff: StateDiff,
213        retractions: &mut InProgressRetractions,
214        local_expression_cache: &mut LocalExpressionCache,
215    ) -> Result<(), CatalogError> {
216        match kind {
217            StateUpdateKind::Role(role) => {
218                self.apply_role_update(role, diff, retractions);
219            }
220            StateUpdateKind::RoleAuth(role_auth) => {
221                self.apply_role_auth_update(role_auth, diff, retractions);
222            }
223            StateUpdateKind::Database(database) => {
224                self.apply_database_update(database, diff, retractions);
225            }
226            StateUpdateKind::Schema(schema) => {
227                self.apply_schema_update(schema, diff, retractions);
228            }
229            StateUpdateKind::DefaultPrivilege(default_privilege) => {
230                self.apply_default_privilege_update(default_privilege, diff, retractions);
231            }
232            StateUpdateKind::SystemPrivilege(system_privilege) => {
233                self.apply_system_privilege_update(system_privilege, diff, retractions);
234            }
235            StateUpdateKind::SystemConfiguration(system_configuration) => {
236                self.apply_system_configuration_update(system_configuration, diff, retractions);
237            }
238            StateUpdateKind::Cluster(cluster) => {
239                self.apply_cluster_update(cluster, diff, retractions);
240            }
241            StateUpdateKind::NetworkPolicy(network_policy) => {
242                self.apply_network_policy_update(network_policy, diff, retractions);
243            }
244            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
245                self.apply_introspection_source_index_update(
246                    introspection_source_index,
247                    diff,
248                    retractions,
249                );
250            }
251            StateUpdateKind::ClusterReplica(cluster_replica) => {
252                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
253            }
254            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
255                self.apply_system_object_mapping_update(
256                    system_object_mapping,
257                    diff,
258                    retractions,
259                    local_expression_cache,
260                );
261            }
262            StateUpdateKind::TemporaryItem(item) => {
263                self.apply_temporary_item_update(item, diff, retractions);
264            }
265            StateUpdateKind::Item(item) => {
266                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
267            }
268            StateUpdateKind::Comment(comment) => {
269                self.apply_comment_update(comment, diff, retractions);
270            }
271            StateUpdateKind::SourceReferences(source_reference) => {
272                self.apply_source_references_update(source_reference, diff, retractions);
273            }
274            StateUpdateKind::AuditLog(_audit_log) => {
275                // Audit logs are not stored in-memory.
276            }
277            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
278                self.apply_storage_collection_metadata_update(
279                    storage_collection_metadata,
280                    diff,
281                    retractions,
282                );
283            }
284            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
285                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
286            }
287        }
288
289        Ok(())
290    }
291
292    #[instrument(level = "debug")]
293    fn apply_role_auth_update(
294        &mut self,
295        role_auth: mz_catalog::durable::RoleAuth,
296        diff: StateDiff,
297        retractions: &mut InProgressRetractions,
298    ) {
299        apply_with_update(
300            &mut self.role_auth_by_id,
301            role_auth,
302            |role_auth| role_auth.role_id,
303            diff,
304            &mut retractions.role_auths,
305        );
306    }
307
308    #[instrument(level = "debug")]
309    fn apply_role_update(
310        &mut self,
311        role: mz_catalog::durable::Role,
312        diff: StateDiff,
313        retractions: &mut InProgressRetractions,
314    ) {
315        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
316        apply_with_update(
317            &mut self.roles_by_id,
318            role,
319            |role| role.id,
320            diff,
321            &mut retractions.roles,
322        );
323    }
324
325    #[instrument(level = "debug")]
326    fn apply_database_update(
327        &mut self,
328        database: mz_catalog::durable::Database,
329        diff: StateDiff,
330        retractions: &mut InProgressRetractions,
331    ) {
332        apply_inverted_lookup(
333            &mut self.database_by_name,
334            &database.name,
335            database.id,
336            diff,
337        );
338        apply_with_update(
339            &mut self.database_by_id,
340            database,
341            |database| database.id,
342            diff,
343            &mut retractions.databases,
344        );
345    }
346
347    #[instrument(level = "debug")]
348    fn apply_schema_update(
349        &mut self,
350        schema: mz_catalog::durable::Schema,
351        diff: StateDiff,
352        retractions: &mut InProgressRetractions,
353    ) {
354        let (schemas_by_id, schemas_by_name) = match &schema.database_id {
355            Some(database_id) => {
356                let db = self
357                    .database_by_id
358                    .get_mut(database_id)
359                    .expect("catalog out of sync");
360                (&mut db.schemas_by_id, &mut db.schemas_by_name)
361            }
362            None => (
363                &mut self.ambient_schemas_by_id,
364                &mut self.ambient_schemas_by_name,
365            ),
366        };
367        apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
368        apply_with_update(
369            schemas_by_id,
370            schema,
371            |schema| schema.id,
372            diff,
373            &mut retractions.schemas,
374        );
375    }
376
377    #[instrument(level = "debug")]
378    fn apply_default_privilege_update(
379        &mut self,
380        default_privilege: mz_catalog::durable::DefaultPrivilege,
381        diff: StateDiff,
382        _retractions: &mut InProgressRetractions,
383    ) {
384        match diff {
385            StateDiff::Addition => self
386                .default_privileges
387                .grant(default_privilege.object, default_privilege.acl_item),
388            StateDiff::Retraction => self
389                .default_privileges
390                .revoke(&default_privilege.object, &default_privilege.acl_item),
391        }
392    }
393
394    #[instrument(level = "debug")]
395    fn apply_system_privilege_update(
396        &mut self,
397        system_privilege: MzAclItem,
398        diff: StateDiff,
399        _retractions: &mut InProgressRetractions,
400    ) {
401        match diff {
402            StateDiff::Addition => self.system_privileges.grant(system_privilege),
403            StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
404        }
405    }
406
407    #[instrument(level = "debug")]
408    fn apply_system_configuration_update(
409        &mut self,
410        system_configuration: mz_catalog::durable::SystemConfiguration,
411        diff: StateDiff,
412        _retractions: &mut InProgressRetractions,
413    ) {
414        let res = match diff {
415            StateDiff::Addition => self.insert_system_configuration(
416                &system_configuration.name,
417                VarInput::Flat(&system_configuration.value),
418            ),
419            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
420        };
421        match res {
422            Ok(_) => (),
423            // When system variables are deleted, nothing deletes them from the underlying
424            // durable catalog, which isn't great. Still, we need to be able to ignore
425            // unknown variables.
426            Err(Error {
427                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
428            }) => {
429                warn!(%name, "unknown system parameter from catalog storage");
430            }
431            Err(e) => panic!("unable to update system variable: {e:?}"),
432        }
433    }
434
435    #[instrument(level = "debug")]
436    fn apply_cluster_update(
437        &mut self,
438        cluster: mz_catalog::durable::Cluster,
439        diff: StateDiff,
440        retractions: &mut InProgressRetractions,
441    ) {
442        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
443        apply_with_update(
444            &mut self.clusters_by_id,
445            cluster,
446            |cluster| cluster.id,
447            diff,
448            &mut retractions.clusters,
449        );
450    }
451
452    #[instrument(level = "debug")]
453    fn apply_network_policy_update(
454        &mut self,
455        policy: mz_catalog::durable::NetworkPolicy,
456        diff: StateDiff,
457        retractions: &mut InProgressRetractions,
458    ) {
459        apply_inverted_lookup(
460            &mut self.network_policies_by_name,
461            &policy.name,
462            policy.id,
463            diff,
464        );
465        apply_with_update(
466            &mut self.network_policies_by_id,
467            policy,
468            |policy| policy.id,
469            diff,
470            &mut retractions.network_policies,
471        );
472    }
473
474    #[instrument(level = "debug")]
475    fn apply_introspection_source_index_update(
476        &mut self,
477        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
478        diff: StateDiff,
479        retractions: &mut InProgressRetractions,
480    ) {
481        let cluster = self
482            .clusters_by_id
483            .get_mut(&introspection_source_index.cluster_id)
484            .expect("catalog out of sync");
485        let log = BUILTIN_LOG_LOOKUP
486            .get(introspection_source_index.name.as_str())
487            .expect("missing log");
488        apply_inverted_lookup(
489            &mut cluster.log_indexes,
490            &log.variant,
491            introspection_source_index.index_id,
492            diff,
493        );
494
495        match diff {
496            StateDiff::Addition => {
497                if let Some(mut entry) = retractions
498                    .introspection_source_indexes
499                    .remove(&introspection_source_index.item_id)
500                {
501                    // This should only happen during startup as a result of builtin migrations. We
502                    // create a new index item and replace the old one with it.
503                    let (index_name, index) = self.create_introspection_source_index(
504                        introspection_source_index.cluster_id,
505                        log,
506                        introspection_source_index.index_id,
507                    );
508                    assert_eq!(entry.id, introspection_source_index.item_id);
509                    assert_eq!(entry.oid, introspection_source_index.oid);
510                    assert_eq!(entry.name, index_name);
511                    entry.item = index;
512                    self.insert_entry(entry);
513                } else {
514                    self.insert_introspection_source_index(
515                        introspection_source_index.cluster_id,
516                        log,
517                        introspection_source_index.item_id,
518                        introspection_source_index.index_id,
519                        introspection_source_index.oid,
520                    );
521                }
522            }
523            StateDiff::Retraction => {
524                let entry = self.drop_item(introspection_source_index.item_id);
525                retractions
526                    .introspection_source_indexes
527                    .insert(entry.id, entry);
528            }
529        }
530    }
531
532    #[instrument(level = "debug")]
533    fn apply_cluster_replica_update(
534        &mut self,
535        cluster_replica: mz_catalog::durable::ClusterReplica,
536        diff: StateDiff,
537        _retractions: &mut InProgressRetractions,
538    ) {
539        let cluster = self
540            .clusters_by_id
541            .get(&cluster_replica.cluster_id)
542            .expect("catalog out of sync");
543        let azs = cluster.availability_zones();
544        let location = self
545            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
546            .expect("catalog in unexpected state");
547        let cluster = self
548            .clusters_by_id
549            .get_mut(&cluster_replica.cluster_id)
550            .expect("catalog out of sync");
551        apply_inverted_lookup(
552            &mut cluster.replica_id_by_name_,
553            &cluster_replica.name,
554            cluster_replica.replica_id,
555            diff,
556        );
557        match diff {
558            StateDiff::Retraction => {
559                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
560                assert!(
561                    prev.is_some(),
562                    "retraction does not match existing value: {:?}",
563                    cluster_replica.replica_id
564                );
565            }
566            StateDiff::Addition => {
567                let logging = ReplicaLogging {
568                    log_logging: cluster_replica.config.logging.log_logging,
569                    interval: cluster_replica.config.logging.interval,
570                };
571                let config = ReplicaConfig {
572                    location,
573                    compute: ComputeReplicaConfig { logging },
574                };
575                let mem_cluster_replica = ClusterReplica {
576                    name: cluster_replica.name.clone(),
577                    cluster_id: cluster_replica.cluster_id,
578                    replica_id: cluster_replica.replica_id,
579                    config,
580                    owner_id: cluster_replica.owner_id,
581                };
582                let prev = cluster
583                    .replicas_by_id_
584                    .insert(cluster_replica.replica_id, mem_cluster_replica);
585                assert_eq!(
586                    prev, None,
587                    "values must be explicitly retracted before inserting a new value: {:?}",
588                    cluster_replica.replica_id
589                );
590            }
591        }
592    }
593
594    #[instrument(level = "debug")]
595    fn apply_system_object_mapping_update(
596        &mut self,
597        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
598        diff: StateDiff,
599        retractions: &mut InProgressRetractions,
600        local_expression_cache: &mut LocalExpressionCache,
601    ) {
602        let item_id = system_object_mapping.unique_identifier.catalog_id;
603        let global_id = system_object_mapping.unique_identifier.global_id;
604
605        if system_object_mapping.unique_identifier.runtime_alterable() {
606            // Runtime-alterable system objects have real entries in the items
607            // collection and so get handled through the normal `insert_item`
608            // and `drop_item` code paths.
609            return;
610        }
611
612        if let StateDiff::Retraction = diff {
613            let entry = self.drop_item(item_id);
614            retractions.system_object_mappings.insert(item_id, entry);
615            return;
616        }
617
618        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
619            // This implies that we updated the fingerprint for some builtin item. The retraction
620            // was parsed, planned, and optimized using the compiled in definition, not the
621            // definition from a previous version. So we can just stick the old entry back into the
622            // catalog.
623            self.insert_entry(entry);
624            return;
625        }
626
627        let builtin = BUILTIN_LOOKUP
628            .get(&system_object_mapping.description)
629            .expect("missing builtin")
630            .1;
631        let schema_name = builtin.schema();
632        let schema_id = self
633            .ambient_schemas_by_name
634            .get(schema_name)
635            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
636        let name = QualifiedItemName {
637            qualifiers: ItemQualifiers {
638                database_spec: ResolvedDatabaseSpecifier::Ambient,
639                schema_spec: SchemaSpecifier::Id(*schema_id),
640            },
641            item: builtin.name().into(),
642        };
643        match builtin {
644            Builtin::Log(log) => {
645                let mut acl_items = vec![rbac::owner_privilege(
646                    mz_sql::catalog::ObjectType::Source,
647                    MZ_SYSTEM_ROLE_ID,
648                )];
649                acl_items.extend_from_slice(&log.access);
650                self.insert_item(
651                    item_id,
652                    log.oid,
653                    name.clone(),
654                    CatalogItem::Log(Log {
655                        variant: log.variant,
656                        global_id,
657                    }),
658                    MZ_SYSTEM_ROLE_ID,
659                    PrivilegeMap::from_mz_acl_items(acl_items),
660                );
661            }
662
663            Builtin::Table(table) => {
664                let mut acl_items = vec![rbac::owner_privilege(
665                    mz_sql::catalog::ObjectType::Table,
666                    MZ_SYSTEM_ROLE_ID,
667                )];
668                acl_items.extend_from_slice(&table.access);
669
670                self.insert_item(
671                    item_id,
672                    table.oid,
673                    name.clone(),
674                    CatalogItem::Table(Table {
675                        create_sql: None,
676                        desc: VersionedRelationDesc::new(table.desc.clone()),
677                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
678                        conn_id: None,
679                        resolved_ids: ResolvedIds::empty(),
680                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
681                            || {
682                                self.system_config()
683                                    .metrics_retention()
684                                    .try_into()
685                                    .expect("invalid metrics retention")
686                            },
687                        ),
688                        is_retained_metrics_object: table.is_retained_metrics_object,
689                        data_source: TableDataSource::TableWrites {
690                            defaults: vec![Expr::null(); table.desc.arity()],
691                        },
692                    }),
693                    MZ_SYSTEM_ROLE_ID,
694                    PrivilegeMap::from_mz_acl_items(acl_items),
695                );
696            }
697            Builtin::Index(index) => {
698                let custom_logical_compaction_window =
699                    index.is_retained_metrics_object.then(|| {
700                        self.system_config()
701                            .metrics_retention()
702                            .try_into()
703                            .expect("invalid metrics retention")
704                    });
705                // Indexes can't be versioned.
706                let versions = BTreeMap::new();
707
708                let item = self
709                    .parse_item(
710                        global_id,
711                        &index.create_sql(),
712                        &versions,
713                        None,
714                        index.is_retained_metrics_object,
715                        custom_logical_compaction_window,
716                        local_expression_cache,
717                        None,
718                    )
719                    .unwrap_or_else(|e| {
720                        panic!(
721                            "internal error: failed to load bootstrap index:\n\
722                                    {}\n\
723                                    error:\n\
724                                    {:?}\n\n\
725                                    make sure that the schema name is specified in the builtin index's create sql statement.",
726                            index.name, e
727                        )
728                    });
729                let CatalogItem::Index(_) = item else {
730                    panic!(
731                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
732                        index.name
733                    );
734                };
735
736                self.insert_item(
737                    item_id,
738                    index.oid,
739                    name,
740                    item,
741                    MZ_SYSTEM_ROLE_ID,
742                    PrivilegeMap::default(),
743                );
744            }
745            Builtin::View(_) => {
746                // parse_views is responsible for inserting all builtin views.
747                unreachable!("views added elsewhere");
748            }
749
750            // Note: Element types must be loaded before array types.
751            Builtin::Type(typ) => {
752                let typ = self.resolve_builtin_type_references(typ);
753                if let CatalogType::Array { element_reference } = typ.details.typ {
754                    let entry = self.get_entry_mut(&element_reference);
755                    let item_type = match &mut entry.item {
756                        CatalogItem::Type(item_type) => item_type,
757                        _ => unreachable!("types can only reference other types"),
758                    };
759                    item_type.details.array_id = Some(item_id);
760                }
761
762                // Assert that no built-in types are record types so that we don't
763                // need to bother to build a description. Only record types need
764                // descriptions.
765                let desc = None;
766                assert!(!matches!(typ.details.typ, CatalogType::Record { .. }));
767                let schema_id = self.resolve_system_schema(typ.schema);
768
769                self.insert_item(
770                    item_id,
771                    typ.oid,
772                    QualifiedItemName {
773                        qualifiers: ItemQualifiers {
774                            database_spec: ResolvedDatabaseSpecifier::Ambient,
775                            schema_spec: SchemaSpecifier::Id(schema_id),
776                        },
777                        item: typ.name.to_owned(),
778                    },
779                    CatalogItem::Type(Type {
780                        create_sql: None,
781                        global_id,
782                        details: typ.details.clone(),
783                        desc,
784                        resolved_ids: ResolvedIds::empty(),
785                    }),
786                    MZ_SYSTEM_ROLE_ID,
787                    PrivilegeMap::from_mz_acl_items(vec![
788                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
789                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
790                    ]),
791                );
792            }
793
794            Builtin::Func(func) => {
795                // This OID is never used. `func` has a `Vec` of implementations and
796                // each implementation has its own OID. Those are the OIDs that are
797                // actually used by the system.
798                let oid = INVALID_OID;
799                self.insert_item(
800                    item_id,
801                    oid,
802                    name.clone(),
803                    CatalogItem::Func(Func {
804                        inner: func.inner,
805                        global_id,
806                    }),
807                    MZ_SYSTEM_ROLE_ID,
808                    PrivilegeMap::default(),
809                );
810            }
811
812            Builtin::Source(coll) => {
813                let mut acl_items = vec![rbac::owner_privilege(
814                    mz_sql::catalog::ObjectType::Source,
815                    MZ_SYSTEM_ROLE_ID,
816                )];
817                acl_items.extend_from_slice(&coll.access);
818
819                self.insert_item(
820                    item_id,
821                    coll.oid,
822                    name.clone(),
823                    CatalogItem::Source(Source {
824                        create_sql: None,
825                        data_source: DataSourceDesc::Introspection(coll.data_source),
826                        desc: coll.desc.clone(),
827                        global_id,
828                        timeline: Timeline::EpochMilliseconds,
829                        resolved_ids: ResolvedIds::empty(),
830                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
831                            || {
832                                self.system_config()
833                                    .metrics_retention()
834                                    .try_into()
835                                    .expect("invalid metrics retention")
836                            },
837                        ),
838                        is_retained_metrics_object: coll.is_retained_metrics_object,
839                    }),
840                    MZ_SYSTEM_ROLE_ID,
841                    PrivilegeMap::from_mz_acl_items(acl_items),
842                );
843            }
844            Builtin::ContinualTask(ct) => {
845                let mut acl_items = vec![rbac::owner_privilege(
846                    mz_sql::catalog::ObjectType::Source,
847                    MZ_SYSTEM_ROLE_ID,
848                )];
849                acl_items.extend_from_slice(&ct.access);
850                // Continual Tasks can't be versioned.
851                let versions = BTreeMap::new();
852
853                let item = self
854                    .parse_item(
855                        global_id,
856                        &ct.create_sql(),
857                        &versions,
858                        None,
859                        false,
860                        None,
861                        local_expression_cache,
862                        None,
863                    )
864                    .unwrap_or_else(|e| {
865                        panic!(
866                            "internal error: failed to load bootstrap continual task:\n\
867                                    {}\n\
868                                    error:\n\
869                                    {:?}\n\n\
870                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
871                            ct.name, e
872                        )
873                    });
874                let CatalogItem::ContinualTask(_) = &item else {
875                    panic!(
876                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
877                        ct.name
878                    );
879                };
880
881                self.insert_item(
882                    item_id,
883                    ct.oid,
884                    name,
885                    item,
886                    MZ_SYSTEM_ROLE_ID,
887                    PrivilegeMap::from_mz_acl_items(acl_items),
888                );
889            }
890            Builtin::Connection(connection) => {
891                // Connections can't be versioned.
892                let versions = BTreeMap::new();
893                let mut item = self
894                    .parse_item(
895                        global_id,
896                        connection.sql,
897                        &versions,
898                        None,
899                        false,
900                        None,
901                        local_expression_cache,
902                        None,
903                    )
904                    .unwrap_or_else(|e| {
905                        panic!(
906                            "internal error: failed to load bootstrap connection:\n\
907                                    {}\n\
908                                    error:\n\
909                                    {:?}\n\n\
910                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
911                            connection.name, e
912                        )
913                    });
914                let CatalogItem::Connection(_) = &mut item else {
915                    panic!(
916                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
917                        connection.name
918                    );
919                };
920
921                let mut acl_items = vec![rbac::owner_privilege(
922                    mz_sql::catalog::ObjectType::Connection,
923                    connection.owner_id.clone(),
924                )];
925                acl_items.extend_from_slice(connection.access);
926
927                self.insert_item(
928                    item_id,
929                    connection.oid,
930                    name.clone(),
931                    item,
932                    connection.owner_id.clone(),
933                    PrivilegeMap::from_mz_acl_items(acl_items),
934                );
935            }
936        }
937    }
938
939    #[instrument(level = "debug")]
940    fn apply_temporary_item_update(
941        &mut self,
942        TemporaryItem {
943            id,
944            oid,
945            name,
946            item,
947            owner_id,
948            privileges,
949        }: TemporaryItem,
950        diff: StateDiff,
951        retractions: &mut InProgressRetractions,
952    ) {
953        match diff {
954            StateDiff::Addition => {
955                let entry = match retractions.temp_items.remove(&id) {
956                    Some(mut retraction) => {
957                        assert_eq!(retraction.id, id);
958                        retraction.item = item;
959                        retraction.id = id;
960                        retraction.oid = oid;
961                        retraction.name = name;
962                        retraction.owner_id = owner_id;
963                        retraction.privileges = privileges;
964                        retraction
965                    }
966                    None => CatalogEntry {
967                        item,
968                        referenced_by: Vec::new(),
969                        used_by: Vec::new(),
970                        id,
971                        oid,
972                        name,
973                        owner_id,
974                        privileges,
975                    },
976                };
977                self.insert_entry(entry);
978            }
979            StateDiff::Retraction => {
980                let entry = self.drop_item(id);
981                retractions.temp_items.insert(id, entry);
982            }
983        }
984    }
985
986    #[instrument(level = "debug")]
987    fn apply_item_update(
988        &mut self,
989        item: mz_catalog::durable::Item,
990        diff: StateDiff,
991        retractions: &mut InProgressRetractions,
992        local_expression_cache: &mut LocalExpressionCache,
993    ) -> Result<(), CatalogError> {
994        match diff {
995            StateDiff::Addition => {
996                let key = item.key();
997                let mz_catalog::durable::Item {
998                    id,
999                    oid,
1000                    global_id,
1001                    schema_id,
1002                    name,
1003                    create_sql,
1004                    owner_id,
1005                    privileges,
1006                    extra_versions,
1007                } = item;
1008                let schema = self.find_non_temp_schema(&schema_id);
1009                let name = QualifiedItemName {
1010                    qualifiers: ItemQualifiers {
1011                        database_spec: schema.database().clone(),
1012                        schema_spec: schema.id().clone(),
1013                    },
1014                    item: name.clone(),
1015                };
1016                let entry = match retractions.items.remove(&key) {
1017                    Some(mut retraction) => {
1018                        assert_eq!(retraction.id, item.id);
1019                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1020                        // item. This is a performance optimization and not needed for correctness.
1021                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1022                        // is still the same as the trait.
1023                        if retraction.create_sql() != create_sql {
1024                            let item = self
1025                                .deserialize_item(
1026                                    global_id,
1027                                    &create_sql,
1028                                    &extra_versions,
1029                                    local_expression_cache,
1030                                    Some(retraction.item),
1031                                )
1032                                .unwrap_or_else(|e| {
1033                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1034                                });
1035                            retraction.item = item;
1036                        }
1037                        retraction.id = id;
1038                        retraction.oid = oid;
1039                        retraction.name = name;
1040                        retraction.owner_id = owner_id;
1041                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1042
1043                        retraction
1044                    }
1045                    None => {
1046                        let catalog_item = self
1047                            .deserialize_item(
1048                                global_id,
1049                                &create_sql,
1050                                &extra_versions,
1051                                local_expression_cache,
1052                                None,
1053                            )
1054                            .unwrap_or_else(|e| {
1055                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1056                            });
1057                        CatalogEntry {
1058                            item: catalog_item,
1059                            referenced_by: Vec::new(),
1060                            used_by: Vec::new(),
1061                            id,
1062                            oid,
1063                            name,
1064                            owner_id,
1065                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1066                        }
1067                    }
1068                };
1069
1070                self.insert_entry(entry);
1071            }
1072            StateDiff::Retraction => {
1073                let entry = self.drop_item(item.id);
1074                let key = item.into_key_value().0;
1075                retractions.items.insert(key, entry);
1076            }
1077        }
1078        Ok(())
1079    }
1080
1081    #[instrument(level = "debug")]
1082    fn apply_comment_update(
1083        &mut self,
1084        comment: mz_catalog::durable::Comment,
1085        diff: StateDiff,
1086        _retractions: &mut InProgressRetractions,
1087    ) {
1088        match diff {
1089            StateDiff::Addition => {
1090                let prev = self.comments.update_comment(
1091                    comment.object_id,
1092                    comment.sub_component,
1093                    Some(comment.comment),
1094                );
1095                assert_eq!(
1096                    prev, None,
1097                    "values must be explicitly retracted before inserting a new value"
1098                );
1099            }
1100            StateDiff::Retraction => {
1101                let prev =
1102                    self.comments
1103                        .update_comment(comment.object_id, comment.sub_component, None);
1104                assert_eq!(
1105                    prev,
1106                    Some(comment.comment),
1107                    "retraction does not match existing value: ({:?}, {:?})",
1108                    comment.object_id,
1109                    comment.sub_component,
1110                );
1111            }
1112        }
1113    }
1114
1115    #[instrument(level = "debug")]
1116    fn apply_source_references_update(
1117        &mut self,
1118        source_references: mz_catalog::durable::SourceReferences,
1119        diff: StateDiff,
1120        _retractions: &mut InProgressRetractions,
1121    ) {
1122        match diff {
1123            StateDiff::Addition => {
1124                let prev = self
1125                    .source_references
1126                    .insert(source_references.source_id, source_references.into());
1127                assert!(
1128                    prev.is_none(),
1129                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1130                );
1131            }
1132            StateDiff::Retraction => {
1133                let prev = self.source_references.remove(&source_references.source_id);
1134                assert!(
1135                    prev.is_some(),
1136                    "retraction for a non-existent existing value: {source_references:?}"
1137                );
1138            }
1139        }
1140    }
1141
1142    #[instrument(level = "debug")]
1143    fn apply_storage_collection_metadata_update(
1144        &mut self,
1145        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1146        diff: StateDiff,
1147        _retractions: &mut InProgressRetractions,
1148    ) {
1149        apply_inverted_lookup(
1150            &mut self.storage_metadata.collection_metadata,
1151            &storage_collection_metadata.id,
1152            storage_collection_metadata.shard,
1153            diff,
1154        );
1155    }
1156
1157    #[instrument(level = "debug")]
1158    fn apply_unfinalized_shard_update(
1159        &mut self,
1160        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1161        diff: StateDiff,
1162        _retractions: &mut InProgressRetractions,
1163    ) {
1164        match diff {
1165            StateDiff::Addition => {
1166                let newly_inserted = self
1167                    .storage_metadata
1168                    .unfinalized_shards
1169                    .insert(unfinalized_shard.shard);
1170                assert!(
1171                    newly_inserted,
1172                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1173                );
1174            }
1175            StateDiff::Retraction => {
1176                let removed = self
1177                    .storage_metadata
1178                    .unfinalized_shards
1179                    .remove(&unfinalized_shard.shard);
1180                assert!(
1181                    removed,
1182                    "retraction does not match existing value: {unfinalized_shard:?}"
1183                );
1184            }
1185        }
1186    }
1187
1188    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1189    /// durable catalog.
1190    #[instrument]
1191    pub(crate) fn generate_builtin_table_updates(
1192        &self,
1193        updates: Vec<StateUpdate>,
1194    ) -> Vec<BuiltinTableUpdate> {
1195        let mut builtin_table_updates = Vec::new();
1196        for StateUpdate { kind, ts: _, diff } in updates {
1197            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1198            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1199            builtin_table_updates.extend(builtin_table_update);
1200        }
1201        builtin_table_updates
1202    }
1203
1204    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1205    /// durable catalog.
1206    #[instrument(level = "debug")]
1207    pub(crate) fn generate_builtin_table_update(
1208        &self,
1209        kind: StateUpdateKind,
1210        diff: StateDiff,
1211    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1212        let diff = diff.into();
1213        match kind {
1214            StateUpdateKind::Role(role) => {
1215                let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1216                for group_id in role.membership.map.keys() {
1217                    builtin_table_updates
1218                        .push(self.pack_role_members_update(*group_id, role.id, diff))
1219                }
1220                builtin_table_updates
1221            }
1222            StateUpdateKind::Database(database) => {
1223                vec![self.pack_database_update(&database.id, diff)]
1224            }
1225            StateUpdateKind::Schema(schema) => {
1226                let db_spec = schema.database_id.into();
1227                vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1228            }
1229            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1230                vec![self.pack_default_privileges_update(
1231                    &default_privilege.object,
1232                    &default_privilege.acl_item.grantee,
1233                    &default_privilege.acl_item.acl_mode,
1234                    diff,
1235                )]
1236            }
1237            StateUpdateKind::SystemPrivilege(system_privilege) => {
1238                vec![self.pack_system_privileges_update(system_privilege, diff)]
1239            }
1240            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1241            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1242            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1243                self.pack_item_update(introspection_source_index.item_id, diff)
1244            }
1245            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1246                cluster_replica.cluster_id,
1247                &cluster_replica.name,
1248                diff,
1249            ),
1250            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1251                // Runtime-alterable system objects have real entries in the
1252                // items collection and so get handled through the normal
1253                // `StateUpdateKind::Item`.`
1254                if !system_object_mapping.unique_identifier.runtime_alterable() {
1255                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1256                } else {
1257                    vec![]
1258                }
1259            }
1260            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1261            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1262            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1263                comment.object_id,
1264                comment.sub_component,
1265                &comment.comment,
1266                diff,
1267            )],
1268            StateUpdateKind::SourceReferences(source_references) => {
1269                self.pack_source_references_update(&source_references, diff)
1270            }
1271            StateUpdateKind::AuditLog(audit_log) => {
1272                vec![
1273                    self.pack_audit_log_update(&audit_log.event, diff)
1274                        .expect("could not pack audit log update"),
1275                ]
1276            }
1277            StateUpdateKind::NetworkPolicy(policy) => self
1278                .pack_network_policy_update(&policy.id, diff)
1279                .expect("could not pack audit log update"),
1280            StateUpdateKind::StorageCollectionMetadata(_)
1281            | StateUpdateKind::UnfinalizedShard(_)
1282            | StateUpdateKind::RoleAuth(_) => Vec::new(),
1283        }
1284    }
1285
1286    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1287        self.entry_by_id
1288            .get_mut(id)
1289            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1290    }
1291
1292    fn get_schema_mut(
1293        &mut self,
1294        database_spec: &ResolvedDatabaseSpecifier,
1295        schema_spec: &SchemaSpecifier,
1296        conn_id: &ConnectionId,
1297    ) -> &mut Schema {
1298        // Keep in sync with `get_schemas`
1299        match (database_spec, schema_spec) {
1300            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1301                .temporary_schemas
1302                .get_mut(conn_id)
1303                .expect("catalog out of sync"),
1304            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1305                .ambient_schemas_by_id
1306                .get_mut(id)
1307                .expect("catalog out of sync"),
1308            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1309                .database_by_id
1310                .get_mut(database_id)
1311                .expect("catalog out of sync")
1312                .schemas_by_id
1313                .get_mut(schema_id)
1314                .expect("catalog out of sync"),
1315            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1316                unreachable!("temporary schemas are in the ambient database")
1317            }
1318        }
1319    }
1320
1321    /// Install builtin views to the catalog. This is its own function so that views can be
1322    /// optimized in parallel.
1323    ///
1324    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1325    /// problems by sniffing out specific errors and then retrying once those dependencies are
1326    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1327    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1328    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1329    /// in awaiting with nothing retriggering the attempt.
1330    #[instrument(name = "catalog::parse_views")]
1331    async fn parse_builtin_views(
1332        state: &mut CatalogState,
1333        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1334        retractions: &mut InProgressRetractions,
1335        local_expression_cache: &mut LocalExpressionCache,
1336    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1337        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1338        let (updates, additions): (Vec<_>, Vec<_>) =
1339            builtin_views
1340                .into_iter()
1341                .partition_map(|(view, item_id, gid)| {
1342                    match retractions.system_object_mappings.remove(&item_id) {
1343                        Some(entry) => Either::Left(entry),
1344                        None => Either::Right((view, item_id, gid)),
1345                    }
1346                });
1347
1348        for entry in updates {
1349            // This implies that we updated the fingerprint for some builtin view. The retraction
1350            // was parsed, planned, and optimized using the compiled in definition, not the
1351            // definition from a previous version. So we can just stick the old entry back into the
1352            // catalog.
1353            let item_id = entry.id();
1354            state.insert_entry(entry);
1355            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1356        }
1357
1358        let mut handles = Vec::new();
1359        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1360            BTreeMap::new();
1361        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1362        // Some errors are due to the implementation of casts or SQL functions that depend on some
1363        // view. Instead of figuring out the exact view dependency, delay these until the end.
1364        let mut awaiting_all = Vec::new();
1365        // Completed views, needed to avoid race conditions.
1366        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1367        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1368
1369        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1370        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1371            .into_iter()
1372            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1373            .collect();
1374        let item_ids: Vec<_> = views.keys().copied().collect();
1375
1376        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1377        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1378            if handles.is_empty() && ready.is_empty() {
1379                // Enqueue the views that were waiting for all the others.
1380                ready.extend(awaiting_all.drain(..));
1381            }
1382
1383            // Spawn tasks for all ready views.
1384            if !ready.is_empty() {
1385                let spawn_state = Arc::new(state.clone());
1386                while let Some(id) = ready.pop_front() {
1387                    let (view, global_id) = views.get(&id).expect("must exist");
1388                    let global_id = *global_id;
1389                    let create_sql = view.create_sql();
1390                    // Views can't be versioned.
1391                    let versions = BTreeMap::new();
1392
1393                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1394                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1395                    let task_state = Arc::clone(&spawn_state);
1396                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1397                    let handle = mz_ore::task::spawn(
1398                        || "parse view",
1399                        async move {
1400                            let res = task_state.parse_item_inner(
1401                                global_id,
1402                                &create_sql,
1403                                &versions,
1404                                None,
1405                                false,
1406                                None,
1407                                cached_expr,
1408                                None,
1409                            );
1410                            (id, global_id, res)
1411                        }
1412                        .instrument(span),
1413                    );
1414                    handles.push(handle);
1415                }
1416            }
1417
1418            // Wait for a view to be ready.
1419            let (handle, _idx, remaining) = future::select_all(handles).await;
1420            handles = remaining;
1421            let (id, global_id, res) = handle.expect("must join");
1422            let mut insert_cached_expr = |cached_expr| {
1423                if let Some(cached_expr) = cached_expr {
1424                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1425                }
1426            };
1427            match res {
1428                Ok((item, uncached_expr)) => {
1429                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1430                        local_expression_cache.insert_uncached_expression(
1431                            global_id,
1432                            uncached_expr,
1433                            optimizer_features,
1434                        );
1435                    }
1436                    // Add item to catalog.
1437                    let (view, _gid) = views.remove(&id).expect("must exist");
1438                    let schema_id = state
1439                        .ambient_schemas_by_name
1440                        .get(view.schema)
1441                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1442                    let qname = QualifiedItemName {
1443                        qualifiers: ItemQualifiers {
1444                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1445                            schema_spec: SchemaSpecifier::Id(*schema_id),
1446                        },
1447                        item: view.name.into(),
1448                    };
1449                    let mut acl_items = vec![rbac::owner_privilege(
1450                        mz_sql::catalog::ObjectType::View,
1451                        MZ_SYSTEM_ROLE_ID,
1452                    )];
1453                    acl_items.extend_from_slice(&view.access);
1454
1455                    state.insert_item(
1456                        id,
1457                        view.oid,
1458                        qname,
1459                        item,
1460                        MZ_SYSTEM_ROLE_ID,
1461                        PrivilegeMap::from_mz_acl_items(acl_items),
1462                    );
1463
1464                    // Enqueue any items waiting on this dependency.
1465                    let mut resolved_dependent_items = Vec::new();
1466                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1467                        resolved_dependent_items.extend(dependent_items);
1468                    }
1469                    let entry = state.get_entry(&id);
1470                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1471                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1472                        resolved_dependent_items.extend(dependent_items);
1473                    }
1474                    ready.extend(resolved_dependent_items);
1475
1476                    completed_ids.insert(id);
1477                    completed_names.insert(full_name);
1478                }
1479                // If we were missing a dependency, wait for it to be added.
1480                Err((
1481                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1482                    cached_expr,
1483                )) => {
1484                    insert_cached_expr(cached_expr);
1485                    if completed_ids.contains(&missing_dep) {
1486                        ready.push_back(id);
1487                    } else {
1488                        awaiting_id_dependencies
1489                            .entry(missing_dep)
1490                            .or_default()
1491                            .push(id);
1492                    }
1493                }
1494                // If we were missing a dependency, wait for it to be added.
1495                Err((
1496                    AdapterError::PlanError(plan::PlanError::Catalog(
1497                        SqlCatalogError::UnknownItem(missing_dep),
1498                    )),
1499                    cached_expr,
1500                )) => {
1501                    insert_cached_expr(cached_expr);
1502                    match CatalogItemId::from_str(&missing_dep) {
1503                        Ok(missing_dep) => {
1504                            if completed_ids.contains(&missing_dep) {
1505                                ready.push_back(id);
1506                            } else {
1507                                awaiting_id_dependencies
1508                                    .entry(missing_dep)
1509                                    .or_default()
1510                                    .push(id);
1511                            }
1512                        }
1513                        Err(_) => {
1514                            if completed_names.contains(&missing_dep) {
1515                                ready.push_back(id);
1516                            } else {
1517                                awaiting_name_dependencies
1518                                    .entry(missing_dep)
1519                                    .or_default()
1520                                    .push(id);
1521                            }
1522                        }
1523                    }
1524                }
1525                Err((
1526                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1527                    cached_expr,
1528                )) => {
1529                    insert_cached_expr(cached_expr);
1530                    awaiting_all.push(id);
1531                }
1532                Err((e, _)) => {
1533                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1534                    panic!(
1535                        "internal error: failed to load bootstrap view:\n\
1536                            {name}\n\
1537                            error:\n\
1538                            {e:?}\n\n\
1539                            Make sure that the schema name is specified in the builtin view's create sql statement.
1540                            ",
1541                        name = bad_view.name,
1542                    )
1543                }
1544            }
1545        }
1546
1547        assert!(awaiting_id_dependencies.is_empty());
1548        assert!(
1549            awaiting_name_dependencies.is_empty(),
1550            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1551        );
1552        assert!(awaiting_all.is_empty());
1553        assert!(views.is_empty());
1554
1555        // Generate a builtin table update for all the new views.
1556        builtin_table_updates.extend(
1557            item_ids
1558                .into_iter()
1559                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1560        );
1561
1562        builtin_table_updates
1563    }
1564
1565    /// Associates a name, `CatalogItemId`, and entry.
1566    fn insert_entry(&mut self, entry: CatalogEntry) {
1567        if !entry.id.is_system() {
1568            if let Some(cluster_id) = entry.item.cluster_id() {
1569                self.clusters_by_id
1570                    .get_mut(&cluster_id)
1571                    .expect("catalog out of sync")
1572                    .bound_objects
1573                    .insert(entry.id);
1574            };
1575        }
1576
1577        for u in entry.references().items() {
1578            match self.entry_by_id.get_mut(u) {
1579                Some(metadata) => metadata.referenced_by.push(entry.id()),
1580                None => panic!(
1581                    "Catalog: missing dependent catalog item {} while installing {}",
1582                    &u,
1583                    self.resolve_full_name(entry.name(), entry.conn_id())
1584                ),
1585            }
1586        }
1587        for u in entry.uses() {
1588            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1589            // present.
1590            if u == entry.id() {
1591                continue;
1592            }
1593            match self.entry_by_id.get_mut(&u) {
1594                Some(metadata) => metadata.used_by.push(entry.id()),
1595                None => panic!(
1596                    "Catalog: missing dependent catalog item {} while installing {}",
1597                    &u,
1598                    self.resolve_full_name(entry.name(), entry.conn_id())
1599                ),
1600            }
1601        }
1602        for gid in entry.item.global_ids() {
1603            self.entry_by_global_id.insert(gid, entry.id());
1604        }
1605        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1606        let schema = self.get_schema_mut(
1607            &entry.name().qualifiers.database_spec,
1608            &entry.name().qualifiers.schema_spec,
1609            conn_id,
1610        );
1611
1612        let prev_id = match entry.item() {
1613            CatalogItem::Func(_) => schema
1614                .functions
1615                .insert(entry.name().item.clone(), entry.id()),
1616            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1617            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1618        };
1619
1620        assert!(
1621            prev_id.is_none(),
1622            "builtin name collision on {:?}",
1623            entry.name().item.clone()
1624        );
1625
1626        self.entry_by_id.insert(entry.id(), entry.clone());
1627    }
1628
1629    /// Associates a name, [`CatalogItemId`], and entry.
1630    fn insert_item(
1631        &mut self,
1632        id: CatalogItemId,
1633        oid: u32,
1634        name: QualifiedItemName,
1635        item: CatalogItem,
1636        owner_id: RoleId,
1637        privileges: PrivilegeMap,
1638    ) {
1639        let entry = CatalogEntry {
1640            item,
1641            name,
1642            id,
1643            oid,
1644            used_by: Vec::new(),
1645            referenced_by: Vec::new(),
1646            owner_id,
1647            privileges,
1648        };
1649
1650        self.insert_entry(entry);
1651    }
1652
1653    #[mz_ore::instrument(level = "trace")]
1654    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1655        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1656        for u in metadata.references().items() {
1657            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1658                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1659            }
1660        }
1661        for u in metadata.uses() {
1662            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1663                dep_metadata.used_by.retain(|u| *u != metadata.id())
1664            }
1665        }
1666        for gid in metadata.global_ids() {
1667            self.entry_by_global_id.remove(&gid);
1668        }
1669
1670        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1671        let schema = self.get_schema_mut(
1672            &metadata.name().qualifiers.database_spec,
1673            &metadata.name().qualifiers.schema_spec,
1674            conn_id,
1675        );
1676        if metadata.item_type() == CatalogItemType::Type {
1677            schema
1678                .types
1679                .remove(&metadata.name().item)
1680                .expect("catalog out of sync");
1681        } else {
1682            // Functions would need special handling, but we don't yet support
1683            // dropping functions.
1684            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1685
1686            schema
1687                .items
1688                .remove(&metadata.name().item)
1689                .expect("catalog out of sync");
1690        };
1691
1692        if !id.is_system() {
1693            if let Some(cluster_id) = metadata.item().cluster_id() {
1694                assert!(
1695                    self.clusters_by_id
1696                        .get_mut(&cluster_id)
1697                        .expect("catalog out of sync")
1698                        .bound_objects
1699                        .remove(&id),
1700                    "catalog out of sync"
1701                );
1702            }
1703        }
1704
1705        metadata
1706    }
1707
1708    fn insert_introspection_source_index(
1709        &mut self,
1710        cluster_id: ClusterId,
1711        log: &'static BuiltinLog,
1712        item_id: CatalogItemId,
1713        global_id: GlobalId,
1714        oid: u32,
1715    ) {
1716        let (index_name, index) =
1717            self.create_introspection_source_index(cluster_id, log, global_id);
1718        self.insert_item(
1719            item_id,
1720            oid,
1721            index_name,
1722            index,
1723            MZ_SYSTEM_ROLE_ID,
1724            PrivilegeMap::default(),
1725        );
1726    }
1727
1728    fn create_introspection_source_index(
1729        &self,
1730        cluster_id: ClusterId,
1731        log: &'static BuiltinLog,
1732        global_id: GlobalId,
1733    ) -> (QualifiedItemName, CatalogItem) {
1734        let source_name = FullItemName {
1735            database: RawDatabaseSpecifier::Ambient,
1736            schema: log.schema.into(),
1737            item: log.name.into(),
1738        };
1739        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1740        let mut index_name = QualifiedItemName {
1741            qualifiers: ItemQualifiers {
1742                database_spec: ResolvedDatabaseSpecifier::Ambient,
1743                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1744            },
1745            item: index_name.clone(),
1746        };
1747        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1748        let index_item_name = index_name.item.clone();
1749        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1750        let index = CatalogItem::Index(Index {
1751            global_id,
1752            on: log_global_id,
1753            keys: log
1754                .variant
1755                .index_by()
1756                .into_iter()
1757                .map(MirScalarExpr::column)
1758                .collect(),
1759            create_sql: index_sql(
1760                index_item_name,
1761                cluster_id,
1762                source_name,
1763                &log.variant.desc(),
1764                &log.variant.index_by(),
1765            ),
1766            conn_id: None,
1767            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1768            cluster_id,
1769            is_retained_metrics_object: false,
1770            custom_logical_compaction_window: None,
1771        });
1772        (index_name, index)
1773    }
1774
1775    /// Insert system configuration `name` with `value`.
1776    ///
1777    /// Return a `bool` value indicating whether the configuration was modified
1778    /// by the call.
1779    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1780        Ok(self.system_configuration.set(name, value)?)
1781    }
1782
1783    /// Reset system configuration `name`.
1784    ///
1785    /// Return a `bool` value indicating whether the configuration was modified
1786    /// by the call.
1787    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1788        Ok(self.system_configuration.reset(name)?)
1789    }
1790}
1791
1792/// Sort [`StateUpdate`]s in timestamp then dependency order
1793fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1794    let mut sorted_updates = Vec::with_capacity(updates.len());
1795
1796    updates.sort_by_key(|update| update.ts);
1797    for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
1798        let sorted_ts_updates = sort_updates_inner(updates.collect());
1799        sorted_updates.extend(sorted_ts_updates);
1800    }
1801
1802    sorted_updates
1803}
1804
1805/// Sort [`StateUpdate`]s in dependency order for a single timestamp.
1806fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1807    fn push_update<T>(
1808        update: T,
1809        diff: StateDiff,
1810        retractions: &mut Vec<T>,
1811        additions: &mut Vec<T>,
1812    ) {
1813        match diff {
1814            StateDiff::Retraction => retractions.push(update),
1815            StateDiff::Addition => additions.push(update),
1816        }
1817    }
1818
1819    soft_assert_no_log!(
1820        updates.iter().map(|update| update.ts).all_equal(),
1821        "all timestamps should be equal: {updates:?}"
1822    );
1823
1824    // Partition updates by type so that we can weave different update types into the right spots.
1825    let mut pre_cluster_retractions = Vec::new();
1826    let mut pre_cluster_additions = Vec::new();
1827    let mut cluster_retractions = Vec::new();
1828    let mut cluster_additions = Vec::new();
1829    let mut builtin_item_updates = Vec::new();
1830    let mut item_retractions = Vec::new();
1831    let mut item_additions = Vec::new();
1832    let mut temp_item_retractions = Vec::new();
1833    let mut temp_item_additions = Vec::new();
1834    let mut post_item_retractions = Vec::new();
1835    let mut post_item_additions = Vec::new();
1836    for update in updates {
1837        let diff = update.diff.clone();
1838        match update.kind {
1839            StateUpdateKind::Role(_)
1840            | StateUpdateKind::RoleAuth(_)
1841            | StateUpdateKind::Database(_)
1842            | StateUpdateKind::Schema(_)
1843            | StateUpdateKind::DefaultPrivilege(_)
1844            | StateUpdateKind::SystemPrivilege(_)
1845            | StateUpdateKind::SystemConfiguration(_)
1846            | StateUpdateKind::NetworkPolicy(_) => push_update(
1847                update,
1848                diff,
1849                &mut pre_cluster_retractions,
1850                &mut pre_cluster_additions,
1851            ),
1852            StateUpdateKind::Cluster(_)
1853            | StateUpdateKind::IntrospectionSourceIndex(_)
1854            | StateUpdateKind::ClusterReplica(_) => push_update(
1855                update,
1856                diff,
1857                &mut cluster_retractions,
1858                &mut cluster_additions,
1859            ),
1860            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1861                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1862            }
1863            StateUpdateKind::TemporaryItem(item) => push_update(
1864                (item, update.ts, update.diff),
1865                diff,
1866                &mut temp_item_retractions,
1867                &mut temp_item_additions,
1868            ),
1869            StateUpdateKind::Item(item) => push_update(
1870                (item, update.ts, update.diff),
1871                diff,
1872                &mut item_retractions,
1873                &mut item_additions,
1874            ),
1875            StateUpdateKind::Comment(_)
1876            | StateUpdateKind::SourceReferences(_)
1877            | StateUpdateKind::AuditLog(_)
1878            | StateUpdateKind::StorageCollectionMetadata(_)
1879            | StateUpdateKind::UnfinalizedShard(_) => push_update(
1880                update,
1881                diff,
1882                &mut post_item_retractions,
1883                &mut post_item_additions,
1884            ),
1885        }
1886    }
1887
1888    // Sort builtin item updates by dependency.
1889    let builtin_item_updates = builtin_item_updates
1890        .into_iter()
1891        .map(|(system_object_mapping, ts, diff)| {
1892            let idx = BUILTIN_LOOKUP
1893                .get(&system_object_mapping.description)
1894                .expect("missing builtin")
1895                .0;
1896            (idx, system_object_mapping, ts, diff)
1897        })
1898        .sorted_by_key(|(idx, _, _, _)| *idx)
1899        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
1900
1901    // Further partition builtin item updates.
1902    let mut other_builtin_retractions = Vec::new();
1903    let mut other_builtin_additions = Vec::new();
1904    let mut builtin_index_retractions = Vec::new();
1905    let mut builtin_index_additions = Vec::new();
1906    for (builtin_item_update, ts, diff) in builtin_item_updates {
1907        match &builtin_item_update.description.object_type {
1908            CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
1909                StateUpdate {
1910                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1911                    ts,
1912                    diff,
1913                },
1914                diff,
1915                &mut builtin_index_retractions,
1916                &mut builtin_index_additions,
1917            ),
1918            CatalogItemType::Table
1919            | CatalogItemType::Source
1920            | CatalogItemType::Sink
1921            | CatalogItemType::View
1922            | CatalogItemType::MaterializedView
1923            | CatalogItemType::Type
1924            | CatalogItemType::Func
1925            | CatalogItemType::Secret
1926            | CatalogItemType::Connection => push_update(
1927                StateUpdate {
1928                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1929                    ts,
1930                    diff,
1931                },
1932                diff,
1933                &mut other_builtin_retractions,
1934                &mut other_builtin_additions,
1935            ),
1936        }
1937    }
1938
1939    /// Sort `CONNECTION` items.
1940    ///
1941    /// `CONNECTION`s can depend on one another, e.g. a `KAFKA CONNECTION` contains
1942    /// a list of `BROKERS` and these broker definitions can themselves reference other
1943    /// connections. This `BROKERS` list can also be `ALTER`ed and thus it's possible
1944    /// for a connection to depend on another with an ID greater than its own.
1945    fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
1946        let mut topo: BTreeMap<
1947            (mz_catalog::durable::Item, Timestamp, StateDiff),
1948            BTreeSet<CatalogItemId>,
1949        > = BTreeMap::default();
1950        let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
1951
1952        // Initialize our set of topological sort.
1953        tracing::debug!(?connections, "sorting connections");
1954        for (connection, ts, diff) in connections.drain(..) {
1955            let statement = mz_sql::parse::parse(&connection.create_sql)
1956                .expect("valid CONNECTION create_sql")
1957                .into_element()
1958                .ast;
1959            let mut dependencies = mz_sql::names::dependencies(&statement)
1960                .expect("failed to find dependencies of CONNECTION");
1961            // Be defensive and remove any possible self references.
1962            dependencies.remove(&connection.id);
1963            // It's possible we're applying updates to a connection where the
1964            // dependency already exists and thus it's not in `connections`.
1965            dependencies.retain(|dep| existing_connections.contains(dep));
1966
1967            // Be defensive and ensure we're not clobbering any items.
1968            assert_none!(topo.insert((connection, ts, diff), dependencies));
1969        }
1970        tracing::debug!(?topo, ?existing_connections, "built topological sort");
1971
1972        // Do a topological sort, pushing back into the provided Vec.
1973        while !topo.is_empty() {
1974            // Get all of the connections with no dependencies.
1975            let no_deps: Vec<_> = topo
1976                .iter()
1977                .filter_map(|(item, deps)| {
1978                    if deps.is_empty() {
1979                        Some(item.clone())
1980                    } else {
1981                        None
1982                    }
1983                })
1984                .collect();
1985
1986            // Cycle in our graph!
1987            if no_deps.is_empty() {
1988                panic!("programming error, cycle in Connections");
1989            }
1990
1991            // Process all of the items with no dependencies.
1992            for item in no_deps {
1993                // Remove the item from our topological sort.
1994                topo.remove(&item);
1995                // Remove this item from anything that depends on it.
1996                topo.values_mut().for_each(|deps| {
1997                    deps.remove(&item.0.id);
1998                });
1999                // Push it back into our list as "completed".
2000                connections.push(item);
2001            }
2002        }
2003    }
2004
2005    /// Sort item updates by dependency.
2006    ///
2007    /// First we group items into groups that are totally ordered by dependency. For example, when
2008    /// sorting all items by dependency we know that all tables can come after all sources, because
2009    /// a source can never depend on a table. Within these groups, the ID order matches the
2010    /// dependency order.
2011    ///
2012    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
2013    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
2014    /// approach would be to investigate each item, discover their exact dependencies, and then
2015    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
2016    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
2017    /// referred to by name.
2018    ///
2019    /// The logic of this function should match [`sort_temp_item_updates`].
2020    fn sort_item_updates(
2021        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2022    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2023        // Partition items into groups s.t. each item in one group has a predefined order with all
2024        // items in other groups. For example, all sinks are ordered greater than all tables.
2025        let mut types = Vec::new();
2026        // N.B. Functions can depend on system tables, but not user tables.
2027        // TODO(udf): This will change when UDFs are supported.
2028        let mut funcs = Vec::new();
2029        let mut secrets = Vec::new();
2030        let mut connections = Vec::new();
2031        let mut sources = Vec::new();
2032        let mut tables = Vec::new();
2033        let mut derived_items = Vec::new();
2034        let mut sinks = Vec::new();
2035        let mut continual_tasks = Vec::new();
2036
2037        for update in item_updates {
2038            match update.0.item_type() {
2039                CatalogItemType::Type => types.push(update),
2040                CatalogItemType::Func => funcs.push(update),
2041                CatalogItemType::Secret => secrets.push(update),
2042                CatalogItemType::Connection => connections.push(update),
2043                CatalogItemType::Source => sources.push(update),
2044                CatalogItemType::Table => tables.push(update),
2045                CatalogItemType::View
2046                | CatalogItemType::MaterializedView
2047                | CatalogItemType::Index => derived_items.push(update),
2048                CatalogItemType::Sink => sinks.push(update),
2049                CatalogItemType::ContinualTask => continual_tasks.push(update),
2050            }
2051        }
2052
2053        // Within each group, sort by ID.
2054        for group in [
2055            &mut types,
2056            &mut funcs,
2057            &mut secrets,
2058            &mut sources,
2059            &mut tables,
2060            &mut derived_items,
2061            &mut sinks,
2062            &mut continual_tasks,
2063        ] {
2064            group.sort_by_key(|(item, _, _)| item.id);
2065        }
2066
2067        // HACK(parkmycar): Connections are special and can depend on one another. Additionally
2068        // connections can be `ALTER`ed and thus a `CONNECTION` can depend on another whose ID
2069        // is greater than its own.
2070        sort_connections(&mut connections);
2071
2072        iter::empty()
2073            .chain(types)
2074            .chain(funcs)
2075            .chain(secrets)
2076            .chain(connections)
2077            .chain(sources)
2078            .chain(tables)
2079            .chain(derived_items)
2080            .chain(sinks)
2081            .chain(continual_tasks)
2082            .collect()
2083    }
2084
2085    let item_retractions = sort_item_updates(item_retractions);
2086    let item_additions = sort_item_updates(item_additions);
2087
2088    /// Sort temporary item updates by dependency.
2089    ///
2090    /// The logic of this function should match [`sort_item_updates`].
2091    fn sort_temp_item_updates(
2092        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2093    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2094        // Partition items into groups s.t. each item in one group has a predefined order with all
2095        // items in other groups. For example, all sinks are ordered greater than all tables.
2096        let mut types = Vec::new();
2097        // N.B. Functions can depend on system tables, but not user tables.
2098        let mut funcs = Vec::new();
2099        let mut secrets = Vec::new();
2100        let mut connections = Vec::new();
2101        let mut sources = Vec::new();
2102        let mut tables = Vec::new();
2103        let mut derived_items = Vec::new();
2104        let mut sinks = Vec::new();
2105        let mut continual_tasks = Vec::new();
2106
2107        for update in temp_item_updates {
2108            match update.0.item.typ() {
2109                CatalogItemType::Type => types.push(update),
2110                CatalogItemType::Func => funcs.push(update),
2111                CatalogItemType::Secret => secrets.push(update),
2112                CatalogItemType::Connection => connections.push(update),
2113                CatalogItemType::Source => sources.push(update),
2114                CatalogItemType::Table => tables.push(update),
2115                CatalogItemType::View
2116                | CatalogItemType::MaterializedView
2117                | CatalogItemType::Index => derived_items.push(update),
2118                CatalogItemType::Sink => sinks.push(update),
2119                CatalogItemType::ContinualTask => continual_tasks.push(update),
2120            }
2121        }
2122
2123        // Within each group, sort by ID.
2124        for group in [
2125            &mut types,
2126            &mut funcs,
2127            &mut secrets,
2128            &mut connections,
2129            &mut sources,
2130            &mut tables,
2131            &mut derived_items,
2132            &mut sinks,
2133            &mut continual_tasks,
2134        ] {
2135            group.sort_by_key(|(item, _, _)| item.id);
2136        }
2137
2138        iter::empty()
2139            .chain(types)
2140            .chain(funcs)
2141            .chain(secrets)
2142            .chain(connections)
2143            .chain(sources)
2144            .chain(tables)
2145            .chain(derived_items)
2146            .chain(sinks)
2147            .chain(continual_tasks)
2148            .collect()
2149    }
2150    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2151    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2152
2153    /// Merge sorted temporary and non-temp items.
2154    fn merge_item_updates(
2155        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2156        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2157    ) -> Vec<StateUpdate> {
2158        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2159
2160        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2161            (item_updates.front(), temp_item_updates.front())
2162        {
2163            if item.id < temp_item.id {
2164                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2165                state_updates.push(StateUpdate {
2166                    kind: StateUpdateKind::Item(item),
2167                    ts,
2168                    diff,
2169                });
2170            } else if item.id > temp_item.id {
2171                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2172                state_updates.push(StateUpdate {
2173                    kind: StateUpdateKind::TemporaryItem(temp_item),
2174                    ts,
2175                    diff,
2176                });
2177            } else {
2178                unreachable!(
2179                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2180                );
2181            }
2182        }
2183
2184        while let Some((item, ts, diff)) = item_updates.pop_front() {
2185            state_updates.push(StateUpdate {
2186                kind: StateUpdateKind::Item(item),
2187                ts,
2188                diff,
2189            });
2190        }
2191
2192        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2193            state_updates.push(StateUpdate {
2194                kind: StateUpdateKind::TemporaryItem(temp_item),
2195                ts,
2196                diff,
2197            });
2198        }
2199
2200        state_updates
2201    }
2202    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2203    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2204
2205    // Put everything back together.
2206    iter::empty()
2207        // All retractions must be reversed.
2208        .chain(post_item_retractions.into_iter().rev())
2209        .chain(item_retractions.into_iter().rev())
2210        .chain(builtin_index_retractions.into_iter().rev())
2211        .chain(cluster_retractions.into_iter().rev())
2212        .chain(other_builtin_retractions.into_iter().rev())
2213        .chain(pre_cluster_retractions.into_iter().rev())
2214        .chain(pre_cluster_additions)
2215        .chain(other_builtin_additions)
2216        .chain(cluster_additions)
2217        .chain(builtin_index_additions)
2218        .chain(item_additions)
2219        .chain(post_item_additions)
2220        .collect()
2221}
2222
2223/// Most updates are applied one at a time, but during bootstrap, certain types are applied
2224/// separately in a batch for performance reasons. A constraint is that updates must be applied in
2225/// order. This process is modeled as a state machine that batches then applies groups of updates.
2226enum BootstrapApplyState {
2227    /// Additions of builtin views.
2228    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2229    /// Item updates that aren't builtin view additions.
2230    Items(Vec<StateUpdate>),
2231    /// All other updates.
2232    Updates(Vec<StateUpdate>),
2233}
2234
2235impl BootstrapApplyState {
2236    fn new(update: StateUpdate) -> BootstrapApplyState {
2237        match update {
2238            StateUpdate {
2239                kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2240                diff: StateDiff::Addition,
2241                ..
2242            } if matches!(
2243                system_object_mapping.description.object_type,
2244                CatalogItemType::View
2245            ) =>
2246            {
2247                let view_addition = lookup_builtin_view_addition(system_object_mapping);
2248                BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2249            }
2250            StateUpdate {
2251                kind: StateUpdateKind::IntrospectionSourceIndex(_),
2252                ..
2253            }
2254            | StateUpdate {
2255                kind: StateUpdateKind::SystemObjectMapping(_),
2256                ..
2257            }
2258            | StateUpdate {
2259                kind: StateUpdateKind::Item(_),
2260                ..
2261            } => BootstrapApplyState::Items(vec![update]),
2262            update => BootstrapApplyState::Updates(vec![update]),
2263        }
2264    }
2265
2266    /// Apply all updates that have been batched in `self`.
2267    ///
2268    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2269    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2270    /// details.
2271    async fn apply(
2272        self,
2273        state: &mut CatalogState,
2274        retractions: &mut InProgressRetractions,
2275        local_expression_cache: &mut LocalExpressionCache,
2276    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2277        match self {
2278            BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2279                let restore = state.system_configuration.clone();
2280                state.system_configuration.enable_for_item_parsing();
2281                let builtin_table_updates = CatalogState::parse_builtin_views(
2282                    state,
2283                    builtin_view_additions,
2284                    retractions,
2285                    local_expression_cache,
2286                )
2287                .await;
2288                state.system_configuration = restore;
2289                builtin_table_updates
2290            }
2291            BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2292                state
2293                    .apply_updates_inner(updates, retractions, local_expression_cache)
2294                    .expect("corrupt catalog")
2295            }),
2296            BootstrapApplyState::Updates(updates) => state
2297                .apply_updates_inner(updates, retractions, local_expression_cache)
2298                .expect("corrupt catalog"),
2299        }
2300    }
2301
2302    async fn step(
2303        self,
2304        next: BootstrapApplyState,
2305        state: &mut CatalogState,
2306        retractions: &mut InProgressRetractions,
2307        local_expression_cache: &mut LocalExpressionCache,
2308    ) -> (
2309        BootstrapApplyState,
2310        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2311    ) {
2312        match (self, next) {
2313            (
2314                BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2315                BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2316            ) => {
2317                // Continue batching builtin view additions.
2318                builtin_view_additions.extend(next_builtin_view_additions);
2319                (
2320                    BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2321                    Vec::new(),
2322                )
2323            }
2324            (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2325                // Continue batching item updates.
2326                updates.extend(next_updates);
2327                (BootstrapApplyState::Items(updates), Vec::new())
2328            }
2329            (
2330                BootstrapApplyState::Updates(mut updates),
2331                BootstrapApplyState::Updates(next_updates),
2332            ) => {
2333                // Continue batching updates.
2334                updates.extend(next_updates);
2335                (BootstrapApplyState::Updates(updates), Vec::new())
2336            }
2337            (apply_state, next_apply_state) => {
2338                // Apply the current batch and start batching new apply state.
2339                let builtin_table_update = apply_state
2340                    .apply(state, retractions, local_expression_cache)
2341                    .await;
2342                (next_apply_state, builtin_table_update)
2343            }
2344        }
2345    }
2346}
2347
2348/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2349/// generally IDs.
2350///
2351/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2352fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2353where
2354    K: Ord + Clone + Debug,
2355    V: PartialEq + Debug,
2356{
2357    match diff {
2358        StateDiff::Retraction => {
2359            let prev = map.remove(key);
2360            assert_eq!(
2361                prev,
2362                Some(value),
2363                "retraction does not match existing value: {key:?}"
2364            );
2365        }
2366        StateDiff::Addition => {
2367            let prev = map.insert(key.clone(), value);
2368            assert_eq!(
2369                prev, None,
2370                "values must be explicitly retracted before inserting a new value: {key:?}"
2371            );
2372        }
2373    }
2374}
2375
2376/// Helper method to update catalog state, that may need to be updated from a previously retracted
2377/// object.
2378fn apply_with_update<K, V, D>(
2379    map: &mut BTreeMap<K, V>,
2380    durable: D,
2381    key_fn: impl FnOnce(&D) -> K,
2382    diff: StateDiff,
2383    retractions: &mut BTreeMap<D::Key, V>,
2384) where
2385    K: Ord,
2386    V: UpdateFrom<D> + PartialEq + Debug,
2387    D: DurableType,
2388    D::Key: Ord,
2389{
2390    match diff {
2391        StateDiff::Retraction => {
2392            let mem_key = key_fn(&durable);
2393            let value = map
2394                .remove(&mem_key)
2395                .expect("retraction does not match existing value: {key:?}");
2396            let durable_key = durable.into_key_value().0;
2397            retractions.insert(durable_key, value);
2398        }
2399        StateDiff::Addition => {
2400            let mem_key = key_fn(&durable);
2401            let durable_key = durable.key();
2402            let value = match retractions.remove(&durable_key) {
2403                Some(mut retraction) => {
2404                    retraction.update_from(durable);
2405                    retraction
2406                }
2407                None => durable.into(),
2408            };
2409            let prev = map.insert(mem_key, value);
2410            assert_eq!(
2411                prev, None,
2412                "values must be explicitly retracted before inserting a new value"
2413            );
2414        }
2415    }
2416}
2417
2418/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2419fn lookup_builtin_view_addition(
2420    mapping: SystemObjectMapping,
2421) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2422    let (_, builtin) = BUILTIN_LOOKUP
2423        .get(&mapping.description)
2424        .expect("missing builtin view");
2425    let Builtin::View(view) = builtin else {
2426        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2427    };
2428
2429    (
2430        view,
2431        mapping.unique_identifier.catalog_id,
2432        mapping.unique_identifier.global_id,
2433    )
2434}