mz_adapter/catalog/
apply.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11//! [`CatalogState`].
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::future;
20use itertools::{Either, Itertools};
21use mz_adapter_types::connection::ConnectionId;
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
25};
26use mz_catalog::durable::objects::{
27    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
28    SchemaKey,
29};
30use mz_catalog::durable::{CatalogError, SystemObjectMapping};
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
34    NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
35    TableDataSource, TemporaryItem, Type, UpdateFrom,
36};
37use mz_compute_types::config::ComputeReplicaConfig;
38use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
39use mz_controller_types::ClusterId;
40use mz_expr::MirScalarExpr;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_ore::{instrument, soft_assert_no_log};
43use mz_pgrepr::oid::INVALID_OID;
44use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
47use mz_sql::catalog::CatalogError as SqlCatalogError;
48use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
49use mz_sql::names::{
50    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
51    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
52};
53use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
54use mz_sql::session::vars::{VarError, VarInput};
55use mz_sql::{plan, rbac};
56use mz_sql_parser::ast::Expr;
57use mz_storage_types::sources::Timeline;
58use tracing::{Instrument, info_span, warn};
59
60use crate::AdapterError;
61use crate::catalog::state::LocalExpressionCache;
62use crate::catalog::{BuiltinTableUpdate, CatalogState};
63use crate::util::index_sql;
64
65/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
66/// [`CatalogState`] maintains denormalized state for certain catalog objects. Updating an object
67/// results in applying a retraction for that object followed by applying an addition for that
68/// object. When applying those additions it can be extremely expensive to re-build that
69/// denormalized state from scratch. To avoid that issue we stash the denormalized state from
70/// retractions, so it can be used during additions.
71///
72/// Not all objects maintain denormalized state, so we only stash the retractions for the subset of
73/// objects that maintain denormalized state.
74// TODO(jkosh44) It might be simpler or more future proof to include all object types here, even if
75// the update step is a no-op for certain types.
76#[derive(Debug, Clone, Default)]
77struct InProgressRetractions {
78    roles: BTreeMap<RoleKey, Role>,
79    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
80    databases: BTreeMap<DatabaseKey, Database>,
81    schemas: BTreeMap<SchemaKey, Schema>,
82    clusters: BTreeMap<ClusterKey, Cluster>,
83    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
84    items: BTreeMap<ItemKey, CatalogEntry>,
85    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
86    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
87    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
88}
89
90impl CatalogState {
91    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
92    ///
93    /// Returns builtin table updates corresponding to the changes to catalog state.
94    ///
95    /// This is meant specifically for bootstrapping because it batches and applies builtin view
96    /// additions separately from other update types.
97    #[must_use]
98    #[instrument]
99    pub(crate) async fn apply_updates_for_bootstrap(
100        &mut self,
101        updates: Vec<StateUpdate>,
102        local_expression_cache: &mut LocalExpressionCache,
103    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
104        let mut builtin_table_updates = Vec::with_capacity(updates.len());
105        let updates = sort_updates(updates);
106
107        let mut groups: Vec<Vec<_>> = Vec::new();
108        for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
109            groups.push(updates.collect());
110        }
111        for updates in groups {
112            let mut apply_state = BootstrapApplyState::Updates(Vec::new());
113            let mut retractions = InProgressRetractions::default();
114
115            for update in updates {
116                let next_apply_state = BootstrapApplyState::new(update);
117                let (next_apply_state, builtin_table_update) = apply_state
118                    .step(
119                        next_apply_state,
120                        self,
121                        &mut retractions,
122                        local_expression_cache,
123                    )
124                    .await;
125                apply_state = next_apply_state;
126                builtin_table_updates.extend(builtin_table_update);
127            }
128
129            // Apply remaining state.
130            let builtin_table_update = apply_state
131                .apply(self, &mut retractions, local_expression_cache)
132                .await;
133            builtin_table_updates.extend(builtin_table_update);
134        }
135        builtin_table_updates
136    }
137
138    /// Update in-memory catalog state from a list of updates made to the durable catalog state.
139    ///
140    /// Returns builtin table updates corresponding to the changes to catalog state.
141    #[instrument]
142    pub(crate) fn apply_updates(
143        &mut self,
144        updates: Vec<StateUpdate>,
145    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
146        let mut builtin_table_updates = Vec::with_capacity(updates.len());
147        let updates = sort_updates(updates);
148
149        for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
150            let mut retractions = InProgressRetractions::default();
151            let builtin_table_update = self.apply_updates_inner(
152                updates.collect(),
153                &mut retractions,
154                &mut LocalExpressionCache::Closed,
155            )?;
156            builtin_table_updates.extend(builtin_table_update);
157        }
158
159        Ok(builtin_table_updates)
160    }
161
162    #[instrument(level = "debug")]
163    fn apply_updates_inner(
164        &mut self,
165        updates: Vec<StateUpdate>,
166        retractions: &mut InProgressRetractions,
167        local_expression_cache: &mut LocalExpressionCache,
168    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
169        soft_assert_no_log!(
170            updates.iter().map(|update| update.ts).all_equal(),
171            "all timestamps should be equal: {updates:?}"
172        );
173
174        let mut update_system_config = false;
175
176        let mut builtin_table_updates = Vec::with_capacity(updates.len());
177        for StateUpdate { kind, ts: _, diff } in updates {
178            if matches!(kind, StateUpdateKind::SystemConfiguration(_)) {
179                update_system_config = true;
180            }
181
182            match diff {
183                StateDiff::Retraction => {
184                    // We want the builtin table retraction to match the state of the catalog
185                    // before applying the update.
186                    builtin_table_updates
187                        .extend(self.generate_builtin_table_update(kind.clone(), diff));
188                    self.apply_update(kind, diff, retractions, local_expression_cache)?;
189                }
190                StateDiff::Addition => {
191                    self.apply_update(kind.clone(), diff, retractions, local_expression_cache)?;
192                    // We want the builtin table addition to match the state of the catalog
193                    // after applying the update.
194                    builtin_table_updates
195                        .extend(self.generate_builtin_table_update(kind.clone(), diff));
196                }
197            }
198        }
199
200        if update_system_config {
201            self.system_configuration.dyncfg_updates();
202        }
203
204        Ok(builtin_table_updates)
205    }
206
207    #[instrument(level = "debug")]
208    fn apply_update(
209        &mut self,
210        kind: StateUpdateKind,
211        diff: StateDiff,
212        retractions: &mut InProgressRetractions,
213        local_expression_cache: &mut LocalExpressionCache,
214    ) -> Result<(), CatalogError> {
215        match kind {
216            StateUpdateKind::Role(role) => {
217                self.apply_role_update(role, diff, retractions);
218            }
219            StateUpdateKind::RoleAuth(role_auth) => {
220                self.apply_role_auth_update(role_auth, diff, retractions);
221            }
222            StateUpdateKind::Database(database) => {
223                self.apply_database_update(database, diff, retractions);
224            }
225            StateUpdateKind::Schema(schema) => {
226                self.apply_schema_update(schema, diff, retractions);
227            }
228            StateUpdateKind::DefaultPrivilege(default_privilege) => {
229                self.apply_default_privilege_update(default_privilege, diff, retractions);
230            }
231            StateUpdateKind::SystemPrivilege(system_privilege) => {
232                self.apply_system_privilege_update(system_privilege, diff, retractions);
233            }
234            StateUpdateKind::SystemConfiguration(system_configuration) => {
235                self.apply_system_configuration_update(system_configuration, diff, retractions);
236            }
237            StateUpdateKind::Cluster(cluster) => {
238                self.apply_cluster_update(cluster, diff, retractions);
239            }
240            StateUpdateKind::NetworkPolicy(network_policy) => {
241                self.apply_network_policy_update(network_policy, diff, retractions);
242            }
243            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
244                self.apply_introspection_source_index_update(
245                    introspection_source_index,
246                    diff,
247                    retractions,
248                );
249            }
250            StateUpdateKind::ClusterReplica(cluster_replica) => {
251                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
252            }
253            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
254                self.apply_system_object_mapping_update(
255                    system_object_mapping,
256                    diff,
257                    retractions,
258                    local_expression_cache,
259                );
260            }
261            StateUpdateKind::TemporaryItem(item) => {
262                self.apply_temporary_item_update(item, diff, retractions);
263            }
264            StateUpdateKind::Item(item) => {
265                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
266            }
267            StateUpdateKind::Comment(comment) => {
268                self.apply_comment_update(comment, diff, retractions);
269            }
270            StateUpdateKind::SourceReferences(source_reference) => {
271                self.apply_source_references_update(source_reference, diff, retractions);
272            }
273            StateUpdateKind::AuditLog(_audit_log) => {
274                // Audit logs are not stored in-memory.
275            }
276            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
277                self.apply_storage_collection_metadata_update(
278                    storage_collection_metadata,
279                    diff,
280                    retractions,
281                );
282            }
283            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
284                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
285            }
286        }
287
288        Ok(())
289    }
290
291    #[instrument(level = "debug")]
292    fn apply_role_auth_update(
293        &mut self,
294        role_auth: mz_catalog::durable::RoleAuth,
295        diff: StateDiff,
296        retractions: &mut InProgressRetractions,
297    ) {
298        apply_with_update(
299            &mut self.role_auth_by_id,
300            role_auth,
301            |role_auth| role_auth.role_id,
302            diff,
303            &mut retractions.role_auths,
304        );
305    }
306
307    #[instrument(level = "debug")]
308    fn apply_role_update(
309        &mut self,
310        role: mz_catalog::durable::Role,
311        diff: StateDiff,
312        retractions: &mut InProgressRetractions,
313    ) {
314        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
315        apply_with_update(
316            &mut self.roles_by_id,
317            role,
318            |role| role.id,
319            diff,
320            &mut retractions.roles,
321        );
322    }
323
324    #[instrument(level = "debug")]
325    fn apply_database_update(
326        &mut self,
327        database: mz_catalog::durable::Database,
328        diff: StateDiff,
329        retractions: &mut InProgressRetractions,
330    ) {
331        apply_inverted_lookup(
332            &mut self.database_by_name,
333            &database.name,
334            database.id,
335            diff,
336        );
337        apply_with_update(
338            &mut self.database_by_id,
339            database,
340            |database| database.id,
341            diff,
342            &mut retractions.databases,
343        );
344    }
345
346    #[instrument(level = "debug")]
347    fn apply_schema_update(
348        &mut self,
349        schema: mz_catalog::durable::Schema,
350        diff: StateDiff,
351        retractions: &mut InProgressRetractions,
352    ) {
353        let (schemas_by_id, schemas_by_name) = match &schema.database_id {
354            Some(database_id) => {
355                let db = self
356                    .database_by_id
357                    .get_mut(database_id)
358                    .expect("catalog out of sync");
359                (&mut db.schemas_by_id, &mut db.schemas_by_name)
360            }
361            None => (
362                &mut self.ambient_schemas_by_id,
363                &mut self.ambient_schemas_by_name,
364            ),
365        };
366        apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
367        apply_with_update(
368            schemas_by_id,
369            schema,
370            |schema| schema.id,
371            diff,
372            &mut retractions.schemas,
373        );
374    }
375
376    #[instrument(level = "debug")]
377    fn apply_default_privilege_update(
378        &mut self,
379        default_privilege: mz_catalog::durable::DefaultPrivilege,
380        diff: StateDiff,
381        _retractions: &mut InProgressRetractions,
382    ) {
383        match diff {
384            StateDiff::Addition => self
385                .default_privileges
386                .grant(default_privilege.object, default_privilege.acl_item),
387            StateDiff::Retraction => self
388                .default_privileges
389                .revoke(&default_privilege.object, &default_privilege.acl_item),
390        }
391    }
392
393    #[instrument(level = "debug")]
394    fn apply_system_privilege_update(
395        &mut self,
396        system_privilege: MzAclItem,
397        diff: StateDiff,
398        _retractions: &mut InProgressRetractions,
399    ) {
400        match diff {
401            StateDiff::Addition => self.system_privileges.grant(system_privilege),
402            StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
403        }
404    }
405
406    #[instrument(level = "debug")]
407    fn apply_system_configuration_update(
408        &mut self,
409        system_configuration: mz_catalog::durable::SystemConfiguration,
410        diff: StateDiff,
411        _retractions: &mut InProgressRetractions,
412    ) {
413        let res = match diff {
414            StateDiff::Addition => self.insert_system_configuration(
415                &system_configuration.name,
416                VarInput::Flat(&system_configuration.value),
417            ),
418            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
419        };
420        match res {
421            Ok(_) => (),
422            // When system variables are deleted, nothing deletes them from the underlying
423            // durable catalog, which isn't great. Still, we need to be able to ignore
424            // unknown variables.
425            Err(Error {
426                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
427            }) => {
428                warn!(%name, "unknown system parameter from catalog storage");
429            }
430            Err(e) => panic!("unable to update system variable: {e:?}"),
431        }
432    }
433
434    #[instrument(level = "debug")]
435    fn apply_cluster_update(
436        &mut self,
437        cluster: mz_catalog::durable::Cluster,
438        diff: StateDiff,
439        retractions: &mut InProgressRetractions,
440    ) {
441        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
442        apply_with_update(
443            &mut self.clusters_by_id,
444            cluster,
445            |cluster| cluster.id,
446            diff,
447            &mut retractions.clusters,
448        );
449    }
450
451    #[instrument(level = "debug")]
452    fn apply_network_policy_update(
453        &mut self,
454        policy: mz_catalog::durable::NetworkPolicy,
455        diff: StateDiff,
456        retractions: &mut InProgressRetractions,
457    ) {
458        apply_inverted_lookup(
459            &mut self.network_policies_by_name,
460            &policy.name,
461            policy.id,
462            diff,
463        );
464        apply_with_update(
465            &mut self.network_policies_by_id,
466            policy,
467            |policy| policy.id,
468            diff,
469            &mut retractions.network_policies,
470        );
471    }
472
473    #[instrument(level = "debug")]
474    fn apply_introspection_source_index_update(
475        &mut self,
476        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
477        diff: StateDiff,
478        retractions: &mut InProgressRetractions,
479    ) {
480        let cluster = self
481            .clusters_by_id
482            .get_mut(&introspection_source_index.cluster_id)
483            .expect("catalog out of sync");
484        let log = BUILTIN_LOG_LOOKUP
485            .get(introspection_source_index.name.as_str())
486            .expect("missing log");
487        apply_inverted_lookup(
488            &mut cluster.log_indexes,
489            &log.variant,
490            introspection_source_index.index_id,
491            diff,
492        );
493
494        match diff {
495            StateDiff::Addition => {
496                if let Some(mut entry) = retractions
497                    .introspection_source_indexes
498                    .remove(&introspection_source_index.item_id)
499                {
500                    // This should only happen during startup as a result of builtin migrations. We
501                    // create a new index item and replace the old one with it.
502                    let (index_name, index) = self.create_introspection_source_index(
503                        introspection_source_index.cluster_id,
504                        log,
505                        introspection_source_index.index_id,
506                    );
507                    assert_eq!(entry.id, introspection_source_index.item_id);
508                    assert_eq!(entry.oid, introspection_source_index.oid);
509                    assert_eq!(entry.name, index_name);
510                    entry.item = index;
511                    self.insert_entry(entry);
512                } else {
513                    self.insert_introspection_source_index(
514                        introspection_source_index.cluster_id,
515                        log,
516                        introspection_source_index.item_id,
517                        introspection_source_index.index_id,
518                        introspection_source_index.oid,
519                    );
520                }
521            }
522            StateDiff::Retraction => {
523                let entry = self.drop_item(introspection_source_index.item_id);
524                retractions
525                    .introspection_source_indexes
526                    .insert(entry.id, entry);
527            }
528        }
529    }
530
531    #[instrument(level = "debug")]
532    fn apply_cluster_replica_update(
533        &mut self,
534        cluster_replica: mz_catalog::durable::ClusterReplica,
535        diff: StateDiff,
536        _retractions: &mut InProgressRetractions,
537    ) {
538        let cluster = self
539            .clusters_by_id
540            .get(&cluster_replica.cluster_id)
541            .expect("catalog out of sync");
542        let azs = cluster.availability_zones();
543        let location = self
544            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
545            .expect("catalog in unexpected state");
546        let cluster = self
547            .clusters_by_id
548            .get_mut(&cluster_replica.cluster_id)
549            .expect("catalog out of sync");
550        apply_inverted_lookup(
551            &mut cluster.replica_id_by_name_,
552            &cluster_replica.name,
553            cluster_replica.replica_id,
554            diff,
555        );
556        match diff {
557            StateDiff::Retraction => {
558                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
559                assert!(
560                    prev.is_some(),
561                    "retraction does not match existing value: {:?}",
562                    cluster_replica.replica_id
563                );
564            }
565            StateDiff::Addition => {
566                let logging = ReplicaLogging {
567                    log_logging: cluster_replica.config.logging.log_logging,
568                    interval: cluster_replica.config.logging.interval,
569                };
570                let config = ReplicaConfig {
571                    location,
572                    compute: ComputeReplicaConfig { logging },
573                };
574                let mem_cluster_replica = ClusterReplica {
575                    name: cluster_replica.name.clone(),
576                    cluster_id: cluster_replica.cluster_id,
577                    replica_id: cluster_replica.replica_id,
578                    config,
579                    owner_id: cluster_replica.owner_id,
580                };
581                let prev = cluster
582                    .replicas_by_id_
583                    .insert(cluster_replica.replica_id, mem_cluster_replica);
584                assert_eq!(
585                    prev, None,
586                    "values must be explicitly retracted before inserting a new value: {:?}",
587                    cluster_replica.replica_id
588                );
589            }
590        }
591    }
592
593    #[instrument(level = "debug")]
594    fn apply_system_object_mapping_update(
595        &mut self,
596        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
597        diff: StateDiff,
598        retractions: &mut InProgressRetractions,
599        local_expression_cache: &mut LocalExpressionCache,
600    ) {
601        let item_id = system_object_mapping.unique_identifier.catalog_id;
602        let global_id = system_object_mapping.unique_identifier.global_id;
603
604        if system_object_mapping.unique_identifier.runtime_alterable() {
605            // Runtime-alterable system objects have real entries in the items
606            // collection and so get handled through the normal `insert_item`
607            // and `drop_item` code paths.
608            return;
609        }
610
611        if let StateDiff::Retraction = diff {
612            let entry = self.drop_item(item_id);
613            retractions.system_object_mappings.insert(item_id, entry);
614            return;
615        }
616
617        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
618            // This implies that we updated the fingerprint for some builtin item. The retraction
619            // was parsed, planned, and optimized using the compiled in definition, not the
620            // definition from a previous version. So we can just stick the old entry back into the
621            // catalog.
622            self.insert_entry(entry);
623            return;
624        }
625
626        let builtin = BUILTIN_LOOKUP
627            .get(&system_object_mapping.description)
628            .expect("missing builtin")
629            .1;
630        let schema_name = builtin.schema();
631        let schema_id = self
632            .ambient_schemas_by_name
633            .get(schema_name)
634            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
635        let name = QualifiedItemName {
636            qualifiers: ItemQualifiers {
637                database_spec: ResolvedDatabaseSpecifier::Ambient,
638                schema_spec: SchemaSpecifier::Id(*schema_id),
639            },
640            item: builtin.name().into(),
641        };
642        match builtin {
643            Builtin::Log(log) => {
644                let mut acl_items = vec![rbac::owner_privilege(
645                    mz_sql::catalog::ObjectType::Source,
646                    MZ_SYSTEM_ROLE_ID,
647                )];
648                acl_items.extend_from_slice(&log.access);
649                self.insert_item(
650                    item_id,
651                    log.oid,
652                    name.clone(),
653                    CatalogItem::Log(Log {
654                        variant: log.variant,
655                        global_id,
656                    }),
657                    MZ_SYSTEM_ROLE_ID,
658                    PrivilegeMap::from_mz_acl_items(acl_items),
659                );
660            }
661
662            Builtin::Table(table) => {
663                let mut acl_items = vec![rbac::owner_privilege(
664                    mz_sql::catalog::ObjectType::Table,
665                    MZ_SYSTEM_ROLE_ID,
666                )];
667                acl_items.extend_from_slice(&table.access);
668
669                self.insert_item(
670                    item_id,
671                    table.oid,
672                    name.clone(),
673                    CatalogItem::Table(Table {
674                        create_sql: None,
675                        desc: VersionedRelationDesc::new(table.desc.clone()),
676                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
677                        conn_id: None,
678                        resolved_ids: ResolvedIds::empty(),
679                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
680                            || {
681                                self.system_config()
682                                    .metrics_retention()
683                                    .try_into()
684                                    .expect("invalid metrics retention")
685                            },
686                        ),
687                        is_retained_metrics_object: table.is_retained_metrics_object,
688                        data_source: TableDataSource::TableWrites {
689                            defaults: vec![Expr::null(); table.desc.arity()],
690                        },
691                    }),
692                    MZ_SYSTEM_ROLE_ID,
693                    PrivilegeMap::from_mz_acl_items(acl_items),
694                );
695            }
696            Builtin::Index(index) => {
697                let custom_logical_compaction_window =
698                    index.is_retained_metrics_object.then(|| {
699                        self.system_config()
700                            .metrics_retention()
701                            .try_into()
702                            .expect("invalid metrics retention")
703                    });
704                // Indexes can't be versioned.
705                let versions = BTreeMap::new();
706
707                let item = self
708                    .parse_item(
709                        global_id,
710                        &index.create_sql(),
711                        &versions,
712                        None,
713                        index.is_retained_metrics_object,
714                        custom_logical_compaction_window,
715                        local_expression_cache,
716                        None,
717                    )
718                    .unwrap_or_else(|e| {
719                        panic!(
720                            "internal error: failed to load bootstrap index:\n\
721                                    {}\n\
722                                    error:\n\
723                                    {:?}\n\n\
724                                    make sure that the schema name is specified in the builtin index's create sql statement.",
725                            index.name, e
726                        )
727                    });
728                let CatalogItem::Index(_) = item else {
729                    panic!(
730                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
731                        index.name
732                    );
733                };
734
735                self.insert_item(
736                    item_id,
737                    index.oid,
738                    name,
739                    item,
740                    MZ_SYSTEM_ROLE_ID,
741                    PrivilegeMap::default(),
742                );
743            }
744            Builtin::View(_) => {
745                // parse_views is responsible for inserting all builtin views.
746                unreachable!("views added elsewhere");
747            }
748
749            // Note: Element types must be loaded before array types.
750            Builtin::Type(typ) => {
751                let typ = self.resolve_builtin_type_references(typ);
752                if let CatalogType::Array { element_reference } = typ.details.typ {
753                    let entry = self.get_entry_mut(&element_reference);
754                    let item_type = match &mut entry.item {
755                        CatalogItem::Type(item_type) => item_type,
756                        _ => unreachable!("types can only reference other types"),
757                    };
758                    item_type.details.array_id = Some(item_id);
759                }
760
761                // Assert that no built-in types are record types so that we don't
762                // need to bother to build a description. Only record types need
763                // descriptions.
764                let desc = None;
765                assert!(!matches!(typ.details.typ, CatalogType::Record { .. }));
766                let schema_id = self.resolve_system_schema(typ.schema);
767
768                self.insert_item(
769                    item_id,
770                    typ.oid,
771                    QualifiedItemName {
772                        qualifiers: ItemQualifiers {
773                            database_spec: ResolvedDatabaseSpecifier::Ambient,
774                            schema_spec: SchemaSpecifier::Id(schema_id),
775                        },
776                        item: typ.name.to_owned(),
777                    },
778                    CatalogItem::Type(Type {
779                        create_sql: None,
780                        global_id,
781                        details: typ.details.clone(),
782                        desc,
783                        resolved_ids: ResolvedIds::empty(),
784                    }),
785                    MZ_SYSTEM_ROLE_ID,
786                    PrivilegeMap::from_mz_acl_items(vec![
787                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
788                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
789                    ]),
790                );
791            }
792
793            Builtin::Func(func) => {
794                // This OID is never used. `func` has a `Vec` of implementations and
795                // each implementation has its own OID. Those are the OIDs that are
796                // actually used by the system.
797                let oid = INVALID_OID;
798                self.insert_item(
799                    item_id,
800                    oid,
801                    name.clone(),
802                    CatalogItem::Func(Func {
803                        inner: func.inner,
804                        global_id,
805                    }),
806                    MZ_SYSTEM_ROLE_ID,
807                    PrivilegeMap::default(),
808                );
809            }
810
811            Builtin::Source(coll) => {
812                let mut acl_items = vec![rbac::owner_privilege(
813                    mz_sql::catalog::ObjectType::Source,
814                    MZ_SYSTEM_ROLE_ID,
815                )];
816                acl_items.extend_from_slice(&coll.access);
817
818                self.insert_item(
819                    item_id,
820                    coll.oid,
821                    name.clone(),
822                    CatalogItem::Source(Source {
823                        create_sql: None,
824                        data_source: DataSourceDesc::Introspection(coll.data_source),
825                        desc: coll.desc.clone(),
826                        global_id,
827                        timeline: Timeline::EpochMilliseconds,
828                        resolved_ids: ResolvedIds::empty(),
829                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
830                            || {
831                                self.system_config()
832                                    .metrics_retention()
833                                    .try_into()
834                                    .expect("invalid metrics retention")
835                            },
836                        ),
837                        is_retained_metrics_object: coll.is_retained_metrics_object,
838                    }),
839                    MZ_SYSTEM_ROLE_ID,
840                    PrivilegeMap::from_mz_acl_items(acl_items),
841                );
842            }
843            Builtin::ContinualTask(ct) => {
844                let mut acl_items = vec![rbac::owner_privilege(
845                    mz_sql::catalog::ObjectType::Source,
846                    MZ_SYSTEM_ROLE_ID,
847                )];
848                acl_items.extend_from_slice(&ct.access);
849                // Continual Tasks can't be versioned.
850                let versions = BTreeMap::new();
851
852                let item = self
853                    .parse_item(
854                        global_id,
855                        &ct.create_sql(),
856                        &versions,
857                        None,
858                        false,
859                        None,
860                        local_expression_cache,
861                        None,
862                    )
863                    .unwrap_or_else(|e| {
864                        panic!(
865                            "internal error: failed to load bootstrap continual task:\n\
866                                    {}\n\
867                                    error:\n\
868                                    {:?}\n\n\
869                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
870                            ct.name, e
871                        )
872                    });
873                let CatalogItem::ContinualTask(_) = &item else {
874                    panic!(
875                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
876                        ct.name
877                    );
878                };
879
880                self.insert_item(
881                    item_id,
882                    ct.oid,
883                    name,
884                    item,
885                    MZ_SYSTEM_ROLE_ID,
886                    PrivilegeMap::from_mz_acl_items(acl_items),
887                );
888            }
889            Builtin::Connection(connection) => {
890                // Connections can't be versioned.
891                let versions = BTreeMap::new();
892                let mut item = self
893                    .parse_item(
894                        global_id,
895                        connection.sql,
896                        &versions,
897                        None,
898                        false,
899                        None,
900                        local_expression_cache,
901                        None,
902                    )
903                    .unwrap_or_else(|e| {
904                        panic!(
905                            "internal error: failed to load bootstrap connection:\n\
906                                    {}\n\
907                                    error:\n\
908                                    {:?}\n\n\
909                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
910                            connection.name, e
911                        )
912                    });
913                let CatalogItem::Connection(_) = &mut item else {
914                    panic!(
915                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
916                        connection.name
917                    );
918                };
919
920                let mut acl_items = vec![rbac::owner_privilege(
921                    mz_sql::catalog::ObjectType::Connection,
922                    connection.owner_id.clone(),
923                )];
924                acl_items.extend_from_slice(connection.access);
925
926                self.insert_item(
927                    item_id,
928                    connection.oid,
929                    name.clone(),
930                    item,
931                    connection.owner_id.clone(),
932                    PrivilegeMap::from_mz_acl_items(acl_items),
933                );
934            }
935        }
936    }
937
938    #[instrument(level = "debug")]
939    fn apply_temporary_item_update(
940        &mut self,
941        TemporaryItem {
942            id,
943            oid,
944            name,
945            item,
946            owner_id,
947            privileges,
948        }: TemporaryItem,
949        diff: StateDiff,
950        retractions: &mut InProgressRetractions,
951    ) {
952        match diff {
953            StateDiff::Addition => {
954                let entry = match retractions.temp_items.remove(&id) {
955                    Some(mut retraction) => {
956                        assert_eq!(retraction.id, id);
957                        retraction.item = item;
958                        retraction.id = id;
959                        retraction.oid = oid;
960                        retraction.name = name;
961                        retraction.owner_id = owner_id;
962                        retraction.privileges = privileges;
963                        retraction
964                    }
965                    None => CatalogEntry {
966                        item,
967                        referenced_by: Vec::new(),
968                        used_by: Vec::new(),
969                        id,
970                        oid,
971                        name,
972                        owner_id,
973                        privileges,
974                    },
975                };
976                self.insert_entry(entry);
977            }
978            StateDiff::Retraction => {
979                let entry = self.drop_item(id);
980                retractions.temp_items.insert(id, entry);
981            }
982        }
983    }
984
985    #[instrument(level = "debug")]
986    fn apply_item_update(
987        &mut self,
988        item: mz_catalog::durable::Item,
989        diff: StateDiff,
990        retractions: &mut InProgressRetractions,
991        local_expression_cache: &mut LocalExpressionCache,
992    ) -> Result<(), CatalogError> {
993        match diff {
994            StateDiff::Addition => {
995                let key = item.key();
996                let mz_catalog::durable::Item {
997                    id,
998                    oid,
999                    global_id,
1000                    schema_id,
1001                    name,
1002                    create_sql,
1003                    owner_id,
1004                    privileges,
1005                    extra_versions,
1006                } = item;
1007                let schema = self.find_non_temp_schema(&schema_id);
1008                let name = QualifiedItemName {
1009                    qualifiers: ItemQualifiers {
1010                        database_spec: schema.database().clone(),
1011                        schema_spec: schema.id().clone(),
1012                    },
1013                    item: name.clone(),
1014                };
1015                let entry = match retractions.items.remove(&key) {
1016                    Some(mut retraction) => {
1017                        assert_eq!(retraction.id, item.id);
1018                        // We only reparse the SQL if it's changed. Otherwise, we use the existing
1019                        // item. This is a performance optimization and not needed for correctness.
1020                        // This makes it difficult to use the `UpdateFrom` trait, but the structure
1021                        // is still the same as the trait.
1022                        if retraction.create_sql() != create_sql {
1023                            let item = self
1024                                .deserialize_item(
1025                                    global_id,
1026                                    &create_sql,
1027                                    &extra_versions,
1028                                    local_expression_cache,
1029                                    Some(retraction.item),
1030                                )
1031                                .unwrap_or_else(|e| {
1032                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1033                                });
1034                            retraction.item = item;
1035                        }
1036                        retraction.id = id;
1037                        retraction.oid = oid;
1038                        retraction.name = name;
1039                        retraction.owner_id = owner_id;
1040                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1041
1042                        retraction
1043                    }
1044                    None => {
1045                        let catalog_item = self
1046                            .deserialize_item(
1047                                global_id,
1048                                &create_sql,
1049                                &extra_versions,
1050                                local_expression_cache,
1051                                None,
1052                            )
1053                            .unwrap_or_else(|e| {
1054                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1055                            });
1056                        CatalogEntry {
1057                            item: catalog_item,
1058                            referenced_by: Vec::new(),
1059                            used_by: Vec::new(),
1060                            id,
1061                            oid,
1062                            name,
1063                            owner_id,
1064                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1065                        }
1066                    }
1067                };
1068
1069                self.insert_entry(entry);
1070            }
1071            StateDiff::Retraction => {
1072                let entry = self.drop_item(item.id);
1073                let key = item.into_key_value().0;
1074                retractions.items.insert(key, entry);
1075            }
1076        }
1077        Ok(())
1078    }
1079
1080    #[instrument(level = "debug")]
1081    fn apply_comment_update(
1082        &mut self,
1083        comment: mz_catalog::durable::Comment,
1084        diff: StateDiff,
1085        _retractions: &mut InProgressRetractions,
1086    ) {
1087        match diff {
1088            StateDiff::Addition => {
1089                let prev = self.comments.update_comment(
1090                    comment.object_id,
1091                    comment.sub_component,
1092                    Some(comment.comment),
1093                );
1094                assert_eq!(
1095                    prev, None,
1096                    "values must be explicitly retracted before inserting a new value"
1097                );
1098            }
1099            StateDiff::Retraction => {
1100                let prev =
1101                    self.comments
1102                        .update_comment(comment.object_id, comment.sub_component, None);
1103                assert_eq!(
1104                    prev,
1105                    Some(comment.comment),
1106                    "retraction does not match existing value: ({:?}, {:?})",
1107                    comment.object_id,
1108                    comment.sub_component,
1109                );
1110            }
1111        }
1112    }
1113
1114    #[instrument(level = "debug")]
1115    fn apply_source_references_update(
1116        &mut self,
1117        source_references: mz_catalog::durable::SourceReferences,
1118        diff: StateDiff,
1119        _retractions: &mut InProgressRetractions,
1120    ) {
1121        match diff {
1122            StateDiff::Addition => {
1123                let prev = self
1124                    .source_references
1125                    .insert(source_references.source_id, source_references.into());
1126                assert!(
1127                    prev.is_none(),
1128                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1129                );
1130            }
1131            StateDiff::Retraction => {
1132                let prev = self.source_references.remove(&source_references.source_id);
1133                assert!(
1134                    prev.is_some(),
1135                    "retraction for a non-existent existing value: {source_references:?}"
1136                );
1137            }
1138        }
1139    }
1140
1141    #[instrument(level = "debug")]
1142    fn apply_storage_collection_metadata_update(
1143        &mut self,
1144        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1145        diff: StateDiff,
1146        _retractions: &mut InProgressRetractions,
1147    ) {
1148        apply_inverted_lookup(
1149            &mut self.storage_metadata.collection_metadata,
1150            &storage_collection_metadata.id,
1151            storage_collection_metadata.shard,
1152            diff,
1153        );
1154    }
1155
1156    #[instrument(level = "debug")]
1157    fn apply_unfinalized_shard_update(
1158        &mut self,
1159        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1160        diff: StateDiff,
1161        _retractions: &mut InProgressRetractions,
1162    ) {
1163        match diff {
1164            StateDiff::Addition => {
1165                let newly_inserted = self
1166                    .storage_metadata
1167                    .unfinalized_shards
1168                    .insert(unfinalized_shard.shard);
1169                assert!(
1170                    newly_inserted,
1171                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1172                );
1173            }
1174            StateDiff::Retraction => {
1175                let removed = self
1176                    .storage_metadata
1177                    .unfinalized_shards
1178                    .remove(&unfinalized_shard.shard);
1179                assert!(
1180                    removed,
1181                    "retraction does not match existing value: {unfinalized_shard:?}"
1182                );
1183            }
1184        }
1185    }
1186
1187    /// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
1188    /// durable catalog.
1189    #[instrument]
1190    pub(crate) fn generate_builtin_table_updates(
1191        &self,
1192        updates: Vec<StateUpdate>,
1193    ) -> Vec<BuiltinTableUpdate> {
1194        let mut builtin_table_updates = Vec::new();
1195        for StateUpdate { kind, ts: _, diff } in updates {
1196            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1197            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1198            builtin_table_updates.extend(builtin_table_update);
1199        }
1200        builtin_table_updates
1201    }
1202
1203    /// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
1204    /// durable catalog.
1205    #[instrument(level = "debug")]
1206    pub(crate) fn generate_builtin_table_update(
1207        &self,
1208        kind: StateUpdateKind,
1209        diff: StateDiff,
1210    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1211        let diff = diff.into();
1212        match kind {
1213            StateUpdateKind::Role(role) => {
1214                let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1215                for group_id in role.membership.map.keys() {
1216                    builtin_table_updates
1217                        .push(self.pack_role_members_update(*group_id, role.id, diff))
1218                }
1219                builtin_table_updates
1220            }
1221            StateUpdateKind::Database(database) => {
1222                vec![self.pack_database_update(&database.id, diff)]
1223            }
1224            StateUpdateKind::Schema(schema) => {
1225                let db_spec = schema.database_id.into();
1226                vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1227            }
1228            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1229                vec![self.pack_default_privileges_update(
1230                    &default_privilege.object,
1231                    &default_privilege.acl_item.grantee,
1232                    &default_privilege.acl_item.acl_mode,
1233                    diff,
1234                )]
1235            }
1236            StateUpdateKind::SystemPrivilege(system_privilege) => {
1237                vec![self.pack_system_privileges_update(system_privilege, diff)]
1238            }
1239            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1240            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1241            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1242                self.pack_item_update(introspection_source_index.item_id, diff)
1243            }
1244            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1245                cluster_replica.cluster_id,
1246                &cluster_replica.name,
1247                diff,
1248            ),
1249            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1250                // Runtime-alterable system objects have real entries in the
1251                // items collection and so get handled through the normal
1252                // `StateUpdateKind::Item`.`
1253                if !system_object_mapping.unique_identifier.runtime_alterable() {
1254                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1255                } else {
1256                    vec![]
1257                }
1258            }
1259            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1260            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1261            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1262                comment.object_id,
1263                comment.sub_component,
1264                &comment.comment,
1265                diff,
1266            )],
1267            StateUpdateKind::SourceReferences(source_references) => {
1268                self.pack_source_references_update(&source_references, diff)
1269            }
1270            StateUpdateKind::AuditLog(audit_log) => {
1271                vec![
1272                    self.pack_audit_log_update(&audit_log.event, diff)
1273                        .expect("could not pack audit log update"),
1274                ]
1275            }
1276            StateUpdateKind::NetworkPolicy(policy) => self
1277                .pack_network_policy_update(&policy.id, diff)
1278                .expect("could not pack audit log update"),
1279            StateUpdateKind::StorageCollectionMetadata(_)
1280            | StateUpdateKind::UnfinalizedShard(_)
1281            | StateUpdateKind::RoleAuth(_) => Vec::new(),
1282        }
1283    }
1284
1285    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1286        self.entry_by_id
1287            .get_mut(id)
1288            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1289    }
1290
1291    fn get_schema_mut(
1292        &mut self,
1293        database_spec: &ResolvedDatabaseSpecifier,
1294        schema_spec: &SchemaSpecifier,
1295        conn_id: &ConnectionId,
1296    ) -> &mut Schema {
1297        // Keep in sync with `get_schemas`
1298        match (database_spec, schema_spec) {
1299            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1300                .temporary_schemas
1301                .get_mut(conn_id)
1302                .expect("catalog out of sync"),
1303            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1304                .ambient_schemas_by_id
1305                .get_mut(id)
1306                .expect("catalog out of sync"),
1307            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1308                .database_by_id
1309                .get_mut(database_id)
1310                .expect("catalog out of sync")
1311                .schemas_by_id
1312                .get_mut(schema_id)
1313                .expect("catalog out of sync"),
1314            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1315                unreachable!("temporary schemas are in the ambient database")
1316            }
1317        }
1318    }
1319
1320    /// Install builtin views to the catalog. This is its own function so that views can be
1321    /// optimized in parallel.
1322    ///
1323    /// The implementation is similar to `apply_updates_for_bootstrap` and determines dependency
1324    /// problems by sniffing out specific errors and then retrying once those dependencies are
1325    /// complete. This doesn't work for everything (casts, function implementations) so we also need
1326    /// to have a bucket for everything at the end. Additionally, because this executes in parellel,
1327    /// we must maintain a completed set otherwise races could result in orphaned views languishing
1328    /// in awaiting with nothing retriggering the attempt.
1329    #[instrument(name = "catalog::parse_views")]
1330    async fn parse_builtin_views(
1331        state: &mut CatalogState,
1332        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1333        retractions: &mut InProgressRetractions,
1334        local_expression_cache: &mut LocalExpressionCache,
1335    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1336        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1337        let (updates, additions): (Vec<_>, Vec<_>) =
1338            builtin_views
1339                .into_iter()
1340                .partition_map(|(view, item_id, gid)| {
1341                    match retractions.system_object_mappings.remove(&item_id) {
1342                        Some(entry) => Either::Left(entry),
1343                        None => Either::Right((view, item_id, gid)),
1344                    }
1345                });
1346
1347        for entry in updates {
1348            // This implies that we updated the fingerprint for some builtin view. The retraction
1349            // was parsed, planned, and optimized using the compiled in definition, not the
1350            // definition from a previous version. So we can just stick the old entry back into the
1351            // catalog.
1352            let item_id = entry.id();
1353            state.insert_entry(entry);
1354            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1355        }
1356
1357        let mut handles = Vec::new();
1358        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1359            BTreeMap::new();
1360        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1361        // Some errors are due to the implementation of casts or SQL functions that depend on some
1362        // view. Instead of figuring out the exact view dependency, delay these until the end.
1363        let mut awaiting_all = Vec::new();
1364        // Completed views, needed to avoid race conditions.
1365        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1366        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1367
1368        // Avoid some reference lifetime issues by not passing `builtin` into the spawned task.
1369        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1370            .into_iter()
1371            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1372            .collect();
1373        let item_ids: Vec<_> = views.keys().copied().collect();
1374
1375        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1376        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1377            if handles.is_empty() && ready.is_empty() {
1378                // Enqueue the views that were waiting for all the others.
1379                ready.extend(awaiting_all.drain(..));
1380            }
1381
1382            // Spawn tasks for all ready views.
1383            if !ready.is_empty() {
1384                let spawn_state = Arc::new(state.clone());
1385                while let Some(id) = ready.pop_front() {
1386                    let (view, global_id) = views.get(&id).expect("must exist");
1387                    let global_id = *global_id;
1388                    let create_sql = view.create_sql();
1389                    // Views can't be versioned.
1390                    let versions = BTreeMap::new();
1391
1392                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1393                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1394                    let task_state = Arc::clone(&spawn_state);
1395                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1396                    let handle = mz_ore::task::spawn(
1397                        || "parse view",
1398                        async move {
1399                            let res = task_state.parse_item_inner(
1400                                global_id,
1401                                &create_sql,
1402                                &versions,
1403                                None,
1404                                false,
1405                                None,
1406                                cached_expr,
1407                                None,
1408                            );
1409                            (id, global_id, res)
1410                        }
1411                        .instrument(span),
1412                    );
1413                    handles.push(handle);
1414                }
1415            }
1416
1417            // Wait for a view to be ready.
1418            let (handle, _idx, remaining) = future::select_all(handles).await;
1419            handles = remaining;
1420            let (id, global_id, res) = handle.expect("must join");
1421            let mut insert_cached_expr = |cached_expr| {
1422                if let Some(cached_expr) = cached_expr {
1423                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1424                }
1425            };
1426            match res {
1427                Ok((item, uncached_expr)) => {
1428                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1429                        local_expression_cache.insert_uncached_expression(
1430                            global_id,
1431                            uncached_expr,
1432                            optimizer_features,
1433                        );
1434                    }
1435                    // Add item to catalog.
1436                    let (view, _gid) = views.remove(&id).expect("must exist");
1437                    let schema_id = state
1438                        .ambient_schemas_by_name
1439                        .get(view.schema)
1440                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1441                    let qname = QualifiedItemName {
1442                        qualifiers: ItemQualifiers {
1443                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1444                            schema_spec: SchemaSpecifier::Id(*schema_id),
1445                        },
1446                        item: view.name.into(),
1447                    };
1448                    let mut acl_items = vec![rbac::owner_privilege(
1449                        mz_sql::catalog::ObjectType::View,
1450                        MZ_SYSTEM_ROLE_ID,
1451                    )];
1452                    acl_items.extend_from_slice(&view.access);
1453
1454                    state.insert_item(
1455                        id,
1456                        view.oid,
1457                        qname,
1458                        item,
1459                        MZ_SYSTEM_ROLE_ID,
1460                        PrivilegeMap::from_mz_acl_items(acl_items),
1461                    );
1462
1463                    // Enqueue any items waiting on this dependency.
1464                    let mut resolved_dependent_items = Vec::new();
1465                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1466                        resolved_dependent_items.extend(dependent_items);
1467                    }
1468                    let entry = state.get_entry(&id);
1469                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1470                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1471                        resolved_dependent_items.extend(dependent_items);
1472                    }
1473                    ready.extend(resolved_dependent_items);
1474
1475                    completed_ids.insert(id);
1476                    completed_names.insert(full_name);
1477                }
1478                // If we were missing a dependency, wait for it to be added.
1479                Err((
1480                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1481                    cached_expr,
1482                )) => {
1483                    insert_cached_expr(cached_expr);
1484                    if completed_ids.contains(&missing_dep) {
1485                        ready.push_back(id);
1486                    } else {
1487                        awaiting_id_dependencies
1488                            .entry(missing_dep)
1489                            .or_default()
1490                            .push(id);
1491                    }
1492                }
1493                // If we were missing a dependency, wait for it to be added.
1494                Err((
1495                    AdapterError::PlanError(plan::PlanError::Catalog(
1496                        SqlCatalogError::UnknownItem(missing_dep),
1497                    )),
1498                    cached_expr,
1499                )) => {
1500                    insert_cached_expr(cached_expr);
1501                    match CatalogItemId::from_str(&missing_dep) {
1502                        Ok(missing_dep) => {
1503                            if completed_ids.contains(&missing_dep) {
1504                                ready.push_back(id);
1505                            } else {
1506                                awaiting_id_dependencies
1507                                    .entry(missing_dep)
1508                                    .or_default()
1509                                    .push(id);
1510                            }
1511                        }
1512                        Err(_) => {
1513                            if completed_names.contains(&missing_dep) {
1514                                ready.push_back(id);
1515                            } else {
1516                                awaiting_name_dependencies
1517                                    .entry(missing_dep)
1518                                    .or_default()
1519                                    .push(id);
1520                            }
1521                        }
1522                    }
1523                }
1524                Err((
1525                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1526                    cached_expr,
1527                )) => {
1528                    insert_cached_expr(cached_expr);
1529                    awaiting_all.push(id);
1530                }
1531                Err((e, _)) => {
1532                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1533                    panic!(
1534                        "internal error: failed to load bootstrap view:\n\
1535                            {name}\n\
1536                            error:\n\
1537                            {e:?}\n\n\
1538                            Make sure that the schema name is specified in the builtin view's create sql statement.
1539                            ",
1540                        name = bad_view.name,
1541                    )
1542                }
1543            }
1544        }
1545
1546        assert!(awaiting_id_dependencies.is_empty());
1547        assert!(
1548            awaiting_name_dependencies.is_empty(),
1549            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1550        );
1551        assert!(awaiting_all.is_empty());
1552        assert!(views.is_empty());
1553
1554        // Generate a builtin table update for all the new views.
1555        builtin_table_updates.extend(
1556            item_ids
1557                .into_iter()
1558                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1559        );
1560
1561        builtin_table_updates
1562    }
1563
1564    /// Associates a name, `CatalogItemId`, and entry.
1565    fn insert_entry(&mut self, entry: CatalogEntry) {
1566        if !entry.id.is_system() {
1567            if let Some(cluster_id) = entry.item.cluster_id() {
1568                self.clusters_by_id
1569                    .get_mut(&cluster_id)
1570                    .expect("catalog out of sync")
1571                    .bound_objects
1572                    .insert(entry.id);
1573            };
1574        }
1575
1576        for u in entry.references().items() {
1577            match self.entry_by_id.get_mut(u) {
1578                Some(metadata) => metadata.referenced_by.push(entry.id()),
1579                None => panic!(
1580                    "Catalog: missing dependent catalog item {} while installing {}",
1581                    &u,
1582                    self.resolve_full_name(entry.name(), entry.conn_id())
1583                ),
1584            }
1585        }
1586        for u in entry.uses() {
1587            // Ignore self for self-referential tasks (e.g. Continual Tasks), if
1588            // present.
1589            if u == entry.id() {
1590                continue;
1591            }
1592            match self.entry_by_id.get_mut(&u) {
1593                Some(metadata) => metadata.used_by.push(entry.id()),
1594                None => panic!(
1595                    "Catalog: missing dependent catalog item {} while installing {}",
1596                    &u,
1597                    self.resolve_full_name(entry.name(), entry.conn_id())
1598                ),
1599            }
1600        }
1601        for gid in entry.item.global_ids() {
1602            self.entry_by_global_id.insert(gid, entry.id());
1603        }
1604        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1605        let schema = self.get_schema_mut(
1606            &entry.name().qualifiers.database_spec,
1607            &entry.name().qualifiers.schema_spec,
1608            conn_id,
1609        );
1610
1611        let prev_id = match entry.item() {
1612            CatalogItem::Func(_) => schema
1613                .functions
1614                .insert(entry.name().item.clone(), entry.id()),
1615            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1616            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1617        };
1618
1619        assert!(
1620            prev_id.is_none(),
1621            "builtin name collision on {:?}",
1622            entry.name().item.clone()
1623        );
1624
1625        self.entry_by_id.insert(entry.id(), entry.clone());
1626    }
1627
1628    /// Associates a name, [`CatalogItemId`], and entry.
1629    fn insert_item(
1630        &mut self,
1631        id: CatalogItemId,
1632        oid: u32,
1633        name: QualifiedItemName,
1634        item: CatalogItem,
1635        owner_id: RoleId,
1636        privileges: PrivilegeMap,
1637    ) {
1638        let entry = CatalogEntry {
1639            item,
1640            name,
1641            id,
1642            oid,
1643            used_by: Vec::new(),
1644            referenced_by: Vec::new(),
1645            owner_id,
1646            privileges,
1647        };
1648
1649        self.insert_entry(entry);
1650    }
1651
1652    #[mz_ore::instrument(level = "trace")]
1653    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1654        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1655        for u in metadata.references().items() {
1656            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1657                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1658            }
1659        }
1660        for u in metadata.uses() {
1661            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1662                dep_metadata.used_by.retain(|u| *u != metadata.id())
1663            }
1664        }
1665        for gid in metadata.global_ids() {
1666            self.entry_by_global_id.remove(&gid);
1667        }
1668
1669        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1670        let schema = self.get_schema_mut(
1671            &metadata.name().qualifiers.database_spec,
1672            &metadata.name().qualifiers.schema_spec,
1673            conn_id,
1674        );
1675        if metadata.item_type() == CatalogItemType::Type {
1676            schema
1677                .types
1678                .remove(&metadata.name().item)
1679                .expect("catalog out of sync");
1680        } else {
1681            // Functions would need special handling, but we don't yet support
1682            // dropping functions.
1683            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1684
1685            schema
1686                .items
1687                .remove(&metadata.name().item)
1688                .expect("catalog out of sync");
1689        };
1690
1691        if !id.is_system() {
1692            if let Some(cluster_id) = metadata.item().cluster_id() {
1693                assert!(
1694                    self.clusters_by_id
1695                        .get_mut(&cluster_id)
1696                        .expect("catalog out of sync")
1697                        .bound_objects
1698                        .remove(&id),
1699                    "catalog out of sync"
1700                );
1701            }
1702        }
1703
1704        metadata
1705    }
1706
1707    fn insert_introspection_source_index(
1708        &mut self,
1709        cluster_id: ClusterId,
1710        log: &'static BuiltinLog,
1711        item_id: CatalogItemId,
1712        global_id: GlobalId,
1713        oid: u32,
1714    ) {
1715        let (index_name, index) =
1716            self.create_introspection_source_index(cluster_id, log, global_id);
1717        self.insert_item(
1718            item_id,
1719            oid,
1720            index_name,
1721            index,
1722            MZ_SYSTEM_ROLE_ID,
1723            PrivilegeMap::default(),
1724        );
1725    }
1726
1727    fn create_introspection_source_index(
1728        &self,
1729        cluster_id: ClusterId,
1730        log: &'static BuiltinLog,
1731        global_id: GlobalId,
1732    ) -> (QualifiedItemName, CatalogItem) {
1733        let source_name = FullItemName {
1734            database: RawDatabaseSpecifier::Ambient,
1735            schema: log.schema.into(),
1736            item: log.name.into(),
1737        };
1738        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1739        let mut index_name = QualifiedItemName {
1740            qualifiers: ItemQualifiers {
1741                database_spec: ResolvedDatabaseSpecifier::Ambient,
1742                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1743            },
1744            item: index_name.clone(),
1745        };
1746        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1747        let index_item_name = index_name.item.clone();
1748        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1749        let index = CatalogItem::Index(Index {
1750            global_id,
1751            on: log_global_id,
1752            keys: log
1753                .variant
1754                .index_by()
1755                .into_iter()
1756                .map(MirScalarExpr::Column)
1757                .collect(),
1758            create_sql: index_sql(
1759                index_item_name,
1760                cluster_id,
1761                source_name,
1762                &log.variant.desc(),
1763                &log.variant.index_by(),
1764            ),
1765            conn_id: None,
1766            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1767            cluster_id,
1768            is_retained_metrics_object: false,
1769            custom_logical_compaction_window: None,
1770        });
1771        (index_name, index)
1772    }
1773
1774    /// Insert system configuration `name` with `value`.
1775    ///
1776    /// Return a `bool` value indicating whether the configuration was modified
1777    /// by the call.
1778    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1779        Ok(self.system_configuration.set(name, value)?)
1780    }
1781
1782    /// Reset system configuration `name`.
1783    ///
1784    /// Return a `bool` value indicating whether the configuration was modified
1785    /// by the call.
1786    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1787        Ok(self.system_configuration.reset(name)?)
1788    }
1789}
1790
1791/// Sort [`StateUpdate`]s in timestamp then dependency order
1792fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1793    let mut sorted_updates = Vec::with_capacity(updates.len());
1794
1795    updates.sort_by_key(|update| update.ts);
1796    for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
1797        let sorted_ts_updates = sort_updates_inner(updates.collect());
1798        sorted_updates.extend(sorted_ts_updates);
1799    }
1800
1801    sorted_updates
1802}
1803
1804/// Sort [`StateUpdate`]s in dependency order for a single timestamp.
1805fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1806    fn push_update<T>(
1807        update: T,
1808        diff: StateDiff,
1809        retractions: &mut Vec<T>,
1810        additions: &mut Vec<T>,
1811    ) {
1812        match diff {
1813            StateDiff::Retraction => retractions.push(update),
1814            StateDiff::Addition => additions.push(update),
1815        }
1816    }
1817
1818    soft_assert_no_log!(
1819        updates.iter().map(|update| update.ts).all_equal(),
1820        "all timestamps should be equal: {updates:?}"
1821    );
1822
1823    // Partition updates by type so that we can weave different update types into the right spots.
1824    let mut pre_cluster_retractions = Vec::new();
1825    let mut pre_cluster_additions = Vec::new();
1826    let mut cluster_retractions = Vec::new();
1827    let mut cluster_additions = Vec::new();
1828    let mut builtin_item_updates = Vec::new();
1829    let mut item_retractions = Vec::new();
1830    let mut item_additions = Vec::new();
1831    let mut temp_item_retractions = Vec::new();
1832    let mut temp_item_additions = Vec::new();
1833    let mut post_item_retractions = Vec::new();
1834    let mut post_item_additions = Vec::new();
1835    for update in updates {
1836        let diff = update.diff.clone();
1837        match update.kind {
1838            StateUpdateKind::Role(_)
1839            | StateUpdateKind::RoleAuth(_)
1840            | StateUpdateKind::Database(_)
1841            | StateUpdateKind::Schema(_)
1842            | StateUpdateKind::DefaultPrivilege(_)
1843            | StateUpdateKind::SystemPrivilege(_)
1844            | StateUpdateKind::SystemConfiguration(_)
1845            | StateUpdateKind::NetworkPolicy(_) => push_update(
1846                update,
1847                diff,
1848                &mut pre_cluster_retractions,
1849                &mut pre_cluster_additions,
1850            ),
1851            StateUpdateKind::Cluster(_)
1852            | StateUpdateKind::IntrospectionSourceIndex(_)
1853            | StateUpdateKind::ClusterReplica(_) => push_update(
1854                update,
1855                diff,
1856                &mut cluster_retractions,
1857                &mut cluster_additions,
1858            ),
1859            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1860                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1861            }
1862            StateUpdateKind::TemporaryItem(item) => push_update(
1863                (item, update.ts, update.diff),
1864                diff,
1865                &mut temp_item_retractions,
1866                &mut temp_item_additions,
1867            ),
1868            StateUpdateKind::Item(item) => push_update(
1869                (item, update.ts, update.diff),
1870                diff,
1871                &mut item_retractions,
1872                &mut item_additions,
1873            ),
1874            StateUpdateKind::Comment(_)
1875            | StateUpdateKind::SourceReferences(_)
1876            | StateUpdateKind::AuditLog(_)
1877            | StateUpdateKind::StorageCollectionMetadata(_)
1878            | StateUpdateKind::UnfinalizedShard(_) => push_update(
1879                update,
1880                diff,
1881                &mut post_item_retractions,
1882                &mut post_item_additions,
1883            ),
1884        }
1885    }
1886
1887    // Sort builtin item updates by dependency.
1888    let builtin_item_updates = builtin_item_updates
1889        .into_iter()
1890        .map(|(system_object_mapping, ts, diff)| {
1891            let idx = BUILTIN_LOOKUP
1892                .get(&system_object_mapping.description)
1893                .expect("missing builtin")
1894                .0;
1895            (idx, system_object_mapping, ts, diff)
1896        })
1897        .sorted_by_key(|(idx, _, _, _)| *idx)
1898        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
1899
1900    // Further partition builtin item updates.
1901    let mut other_builtin_retractions = Vec::new();
1902    let mut other_builtin_additions = Vec::new();
1903    let mut builtin_index_retractions = Vec::new();
1904    let mut builtin_index_additions = Vec::new();
1905    for (builtin_item_update, ts, diff) in builtin_item_updates {
1906        match &builtin_item_update.description.object_type {
1907            CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
1908                StateUpdate {
1909                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1910                    ts,
1911                    diff,
1912                },
1913                diff,
1914                &mut builtin_index_retractions,
1915                &mut builtin_index_additions,
1916            ),
1917            CatalogItemType::Table
1918            | CatalogItemType::Source
1919            | CatalogItemType::Sink
1920            | CatalogItemType::View
1921            | CatalogItemType::MaterializedView
1922            | CatalogItemType::Type
1923            | CatalogItemType::Func
1924            | CatalogItemType::Secret
1925            | CatalogItemType::Connection => push_update(
1926                StateUpdate {
1927                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1928                    ts,
1929                    diff,
1930                },
1931                diff,
1932                &mut other_builtin_retractions,
1933                &mut other_builtin_additions,
1934            ),
1935        }
1936    }
1937
1938    /// Sort item updates by dependency.
1939    ///
1940    /// First we group items into groups that are totally ordered by dependency. For example, when
1941    /// sorting all items by dependency we know that all tables can come after all sources, because
1942    /// a source can never depend on a table. Within these groups, the ID order matches the
1943    /// dependency order.
1944    ///
1945    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
1946    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
1947    /// approach would be to investigate each item, discover their exact dependencies, and then
1948    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
1949    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
1950    /// referred to by name.
1951    ///
1952    /// The logic of this function should match [`sort_temp_item_updates`].
1953    fn sort_item_updates(
1954        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
1955    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
1956        // Partition items into groups s.t. each item in one group has a predefined order with all
1957        // items in other groups. For example, all sinks are ordered greater than all tables.
1958        let mut types = Vec::new();
1959        // N.B. Functions can depend on system tables, but not user tables.
1960        // TODO(udf): This will change when UDFs are supported.
1961        let mut funcs = Vec::new();
1962        let mut secrets = Vec::new();
1963        let mut connections = Vec::new();
1964        let mut sources = Vec::new();
1965        let mut tables = Vec::new();
1966        let mut derived_items = Vec::new();
1967        let mut sinks = Vec::new();
1968        let mut continual_tasks = Vec::new();
1969
1970        for update in item_updates {
1971            match update.0.item_type() {
1972                CatalogItemType::Type => types.push(update),
1973                CatalogItemType::Func => funcs.push(update),
1974                CatalogItemType::Secret => secrets.push(update),
1975                CatalogItemType::Connection => connections.push(update),
1976                CatalogItemType::Source => sources.push(update),
1977                CatalogItemType::Table => tables.push(update),
1978                CatalogItemType::View
1979                | CatalogItemType::MaterializedView
1980                | CatalogItemType::Index => derived_items.push(update),
1981                CatalogItemType::Sink => sinks.push(update),
1982                CatalogItemType::ContinualTask => continual_tasks.push(update),
1983            }
1984        }
1985
1986        // Within each group, sort by ID.
1987        for group in [
1988            &mut types,
1989            &mut funcs,
1990            &mut secrets,
1991            &mut connections,
1992            &mut sources,
1993            &mut tables,
1994            &mut derived_items,
1995            &mut sinks,
1996            &mut continual_tasks,
1997        ] {
1998            group.sort_by_key(|(item, _, _)| item.id);
1999        }
2000
2001        iter::empty()
2002            .chain(types)
2003            .chain(funcs)
2004            .chain(secrets)
2005            .chain(connections)
2006            .chain(sources)
2007            .chain(tables)
2008            .chain(derived_items)
2009            .chain(sinks)
2010            .chain(continual_tasks)
2011            .collect()
2012    }
2013
2014    let item_retractions = sort_item_updates(item_retractions);
2015    let item_additions = sort_item_updates(item_additions);
2016
2017    /// Sort temporary item updates by dependency.
2018    ///
2019    /// The logic of this function should match [`sort_item_updates`].
2020    fn sort_temp_item_updates(
2021        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2022    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2023        // Partition items into groups s.t. each item in one group has a predefined order with all
2024        // items in other groups. For example, all sinks are ordered greater than all tables.
2025        let mut types = Vec::new();
2026        // N.B. Functions can depend on system tables, but not user tables.
2027        let mut funcs = Vec::new();
2028        let mut secrets = Vec::new();
2029        let mut connections = Vec::new();
2030        let mut sources = Vec::new();
2031        let mut tables = Vec::new();
2032        let mut derived_items = Vec::new();
2033        let mut sinks = Vec::new();
2034        let mut continual_tasks = Vec::new();
2035
2036        for update in temp_item_updates {
2037            match update.0.item.typ() {
2038                CatalogItemType::Type => types.push(update),
2039                CatalogItemType::Func => funcs.push(update),
2040                CatalogItemType::Secret => secrets.push(update),
2041                CatalogItemType::Connection => connections.push(update),
2042                CatalogItemType::Source => sources.push(update),
2043                CatalogItemType::Table => tables.push(update),
2044                CatalogItemType::View
2045                | CatalogItemType::MaterializedView
2046                | CatalogItemType::Index => derived_items.push(update),
2047                CatalogItemType::Sink => sinks.push(update),
2048                CatalogItemType::ContinualTask => continual_tasks.push(update),
2049            }
2050        }
2051
2052        // Within each group, sort by ID.
2053        for group in [
2054            &mut types,
2055            &mut funcs,
2056            &mut secrets,
2057            &mut connections,
2058            &mut sources,
2059            &mut tables,
2060            &mut derived_items,
2061            &mut sinks,
2062            &mut continual_tasks,
2063        ] {
2064            group.sort_by_key(|(item, _, _)| item.id);
2065        }
2066
2067        iter::empty()
2068            .chain(types)
2069            .chain(funcs)
2070            .chain(secrets)
2071            .chain(connections)
2072            .chain(sources)
2073            .chain(tables)
2074            .chain(derived_items)
2075            .chain(sinks)
2076            .chain(continual_tasks)
2077            .collect()
2078    }
2079    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2080    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2081
2082    /// Merge sorted temporary and non-temp items.
2083    fn merge_item_updates(
2084        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2085        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2086    ) -> Vec<StateUpdate> {
2087        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2088
2089        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2090            (item_updates.front(), temp_item_updates.front())
2091        {
2092            if item.id < temp_item.id {
2093                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2094                state_updates.push(StateUpdate {
2095                    kind: StateUpdateKind::Item(item),
2096                    ts,
2097                    diff,
2098                });
2099            } else if item.id > temp_item.id {
2100                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2101                state_updates.push(StateUpdate {
2102                    kind: StateUpdateKind::TemporaryItem(temp_item),
2103                    ts,
2104                    diff,
2105                });
2106            } else {
2107                unreachable!(
2108                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2109                );
2110            }
2111        }
2112
2113        while let Some((item, ts, diff)) = item_updates.pop_front() {
2114            state_updates.push(StateUpdate {
2115                kind: StateUpdateKind::Item(item),
2116                ts,
2117                diff,
2118            });
2119        }
2120
2121        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2122            state_updates.push(StateUpdate {
2123                kind: StateUpdateKind::TemporaryItem(temp_item),
2124                ts,
2125                diff,
2126            });
2127        }
2128
2129        state_updates
2130    }
2131    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2132    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2133
2134    // Put everything back together.
2135    iter::empty()
2136        // All retractions must be reversed.
2137        .chain(post_item_retractions.into_iter().rev())
2138        .chain(item_retractions.into_iter().rev())
2139        .chain(builtin_index_retractions.into_iter().rev())
2140        .chain(cluster_retractions.into_iter().rev())
2141        .chain(other_builtin_retractions.into_iter().rev())
2142        .chain(pre_cluster_retractions.into_iter().rev())
2143        .chain(pre_cluster_additions)
2144        .chain(other_builtin_additions)
2145        .chain(cluster_additions)
2146        .chain(builtin_index_additions)
2147        .chain(item_additions)
2148        .chain(post_item_additions)
2149        .collect()
2150}
2151
2152/// Most updates are applied one at a time, but during bootstrap, certain types are applied
2153/// separately in a batch for performance reasons. A constraint is that updates must be applied in
2154/// order. This process is modeled as a state machine that batches then applies groups of updates.
2155enum BootstrapApplyState {
2156    /// Additions of builtin views.
2157    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2158    /// Item updates that aren't builtin view additions.
2159    Items(Vec<StateUpdate>),
2160    /// All other updates.
2161    Updates(Vec<StateUpdate>),
2162}
2163
2164impl BootstrapApplyState {
2165    fn new(update: StateUpdate) -> BootstrapApplyState {
2166        match update {
2167            StateUpdate {
2168                kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2169                diff: StateDiff::Addition,
2170                ..
2171            } if matches!(
2172                system_object_mapping.description.object_type,
2173                CatalogItemType::View
2174            ) =>
2175            {
2176                let view_addition = lookup_builtin_view_addition(system_object_mapping);
2177                BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2178            }
2179            StateUpdate {
2180                kind: StateUpdateKind::IntrospectionSourceIndex(_),
2181                ..
2182            }
2183            | StateUpdate {
2184                kind: StateUpdateKind::SystemObjectMapping(_),
2185                ..
2186            }
2187            | StateUpdate {
2188                kind: StateUpdateKind::Item(_),
2189                ..
2190            } => BootstrapApplyState::Items(vec![update]),
2191            update => BootstrapApplyState::Updates(vec![update]),
2192        }
2193    }
2194
2195    /// Apply all updates that have been batched in `self`.
2196    ///
2197    /// We make sure to enable all "enable_for_item_parsing" feature flags when applying item
2198    /// updates during bootstrap. See [`CatalogState::with_enable_for_item_parsing`] for more
2199    /// details.
2200    async fn apply(
2201        self,
2202        state: &mut CatalogState,
2203        retractions: &mut InProgressRetractions,
2204        local_expression_cache: &mut LocalExpressionCache,
2205    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2206        match self {
2207            BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2208                let restore = state.system_configuration.clone();
2209                state.system_configuration.enable_for_item_parsing();
2210                let builtin_table_updates = CatalogState::parse_builtin_views(
2211                    state,
2212                    builtin_view_additions,
2213                    retractions,
2214                    local_expression_cache,
2215                )
2216                .await;
2217                state.system_configuration = restore;
2218                builtin_table_updates
2219            }
2220            BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2221                state
2222                    .apply_updates_inner(updates, retractions, local_expression_cache)
2223                    .expect("corrupt catalog")
2224            }),
2225            BootstrapApplyState::Updates(updates) => state
2226                .apply_updates_inner(updates, retractions, local_expression_cache)
2227                .expect("corrupt catalog"),
2228        }
2229    }
2230
2231    async fn step(
2232        self,
2233        next: BootstrapApplyState,
2234        state: &mut CatalogState,
2235        retractions: &mut InProgressRetractions,
2236        local_expression_cache: &mut LocalExpressionCache,
2237    ) -> (
2238        BootstrapApplyState,
2239        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2240    ) {
2241        match (self, next) {
2242            (
2243                BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2244                BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2245            ) => {
2246                // Continue batching builtin view additions.
2247                builtin_view_additions.extend(next_builtin_view_additions);
2248                (
2249                    BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2250                    Vec::new(),
2251                )
2252            }
2253            (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2254                // Continue batching item updates.
2255                updates.extend(next_updates);
2256                (BootstrapApplyState::Items(updates), Vec::new())
2257            }
2258            (
2259                BootstrapApplyState::Updates(mut updates),
2260                BootstrapApplyState::Updates(next_updates),
2261            ) => {
2262                // Continue batching updates.
2263                updates.extend(next_updates);
2264                (BootstrapApplyState::Updates(updates), Vec::new())
2265            }
2266            (apply_state, next_apply_state) => {
2267                // Apply the current batch and start batching new apply state.
2268                let builtin_table_update = apply_state
2269                    .apply(state, retractions, local_expression_cache)
2270                    .await;
2271                (next_apply_state, builtin_table_update)
2272            }
2273        }
2274    }
2275}
2276
2277/// Helper method to updated inverted lookup maps. The keys are generally names and the values are
2278/// generally IDs.
2279///
2280/// Importantly, when retracting it's expected that the existing value will match `value` exactly.
2281fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2282where
2283    K: Ord + Clone + Debug,
2284    V: PartialEq + Debug,
2285{
2286    match diff {
2287        StateDiff::Retraction => {
2288            let prev = map.remove(key);
2289            assert_eq!(
2290                prev,
2291                Some(value),
2292                "retraction does not match existing value: {key:?}"
2293            );
2294        }
2295        StateDiff::Addition => {
2296            let prev = map.insert(key.clone(), value);
2297            assert_eq!(
2298                prev, None,
2299                "values must be explicitly retracted before inserting a new value: {key:?}"
2300            );
2301        }
2302    }
2303}
2304
2305/// Helper method to update catalog state, that may need to be updated from a previously retracted
2306/// object.
2307fn apply_with_update<K, V, D>(
2308    map: &mut BTreeMap<K, V>,
2309    durable: D,
2310    key_fn: impl FnOnce(&D) -> K,
2311    diff: StateDiff,
2312    retractions: &mut BTreeMap<D::Key, V>,
2313) where
2314    K: Ord,
2315    V: UpdateFrom<D> + PartialEq + Debug,
2316    D: DurableType,
2317    D::Key: Ord,
2318{
2319    match diff {
2320        StateDiff::Retraction => {
2321            let mem_key = key_fn(&durable);
2322            let value = map
2323                .remove(&mem_key)
2324                .expect("retraction does not match existing value: {key:?}");
2325            let durable_key = durable.into_key_value().0;
2326            retractions.insert(durable_key, value);
2327        }
2328        StateDiff::Addition => {
2329            let mem_key = key_fn(&durable);
2330            let durable_key = durable.key();
2331            let value = match retractions.remove(&durable_key) {
2332                Some(mut retraction) => {
2333                    retraction.update_from(durable);
2334                    retraction
2335                }
2336                None => durable.into(),
2337            };
2338            let prev = map.insert(mem_key, value);
2339            assert_eq!(
2340                prev, None,
2341                "values must be explicitly retracted before inserting a new value"
2342            );
2343        }
2344    }
2345}
2346
2347/// Looks up a [`BuiltinView`] from a [`SystemObjectMapping`].
2348fn lookup_builtin_view_addition(
2349    mapping: SystemObjectMapping,
2350) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2351    let (_, builtin) = BUILTIN_LOOKUP
2352        .get(&mapping.description)
2353        .expect("missing builtin view");
2354    let Builtin::View(view) = builtin else {
2355        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2356    };
2357
2358    (
2359        view,
2360        mapping.unique_identifier.catalog_id,
2361        mapping.unique_identifier.global_id,
2362    )
2363}