1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::future;
20use itertools::{Either, Itertools};
21use mz_adapter_types::connection::ConnectionId;
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24    BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
25};
26use mz_catalog::durable::objects::{
27    ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
28    SchemaKey,
29};
30use mz_catalog::durable::{CatalogError, SystemObjectMapping};
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
34    NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
35    TableDataSource, TemporaryItem, Type, UpdateFrom,
36};
37use mz_compute_types::config::ComputeReplicaConfig;
38use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
39use mz_controller_types::ClusterId;
40use mz_expr::MirScalarExpr;
41use mz_ore::collections::CollectionExt;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_ore::{assert_none, instrument, soft_assert_no_log};
44use mz_pgrepr::oid::INVALID_OID;
45use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
48use mz_sql::catalog::CatalogError as SqlCatalogError;
49use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
50use mz_sql::names::{
51    FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
52    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
53};
54use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
55use mz_sql::session::vars::{VarError, VarInput};
56use mz_sql::{plan, rbac};
57use mz_sql_parser::ast::Expr;
58use mz_storage_types::sources::Timeline;
59use tracing::{Instrument, info_span, warn};
60
61use crate::AdapterError;
62use crate::catalog::state::LocalExpressionCache;
63use crate::catalog::{BuiltinTableUpdate, CatalogState};
64use crate::util::index_sql;
65
66#[derive(Debug, Clone, Default)]
78struct InProgressRetractions {
79    roles: BTreeMap<RoleKey, Role>,
80    role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
81    databases: BTreeMap<DatabaseKey, Database>,
82    schemas: BTreeMap<SchemaKey, Schema>,
83    clusters: BTreeMap<ClusterKey, Cluster>,
84    network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
85    items: BTreeMap<ItemKey, CatalogEntry>,
86    temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
87    introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
88    system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
89}
90
91impl CatalogState {
92    #[must_use]
99    #[instrument]
100    pub(crate) async fn apply_updates_for_bootstrap(
101        &mut self,
102        updates: Vec<StateUpdate>,
103        local_expression_cache: &mut LocalExpressionCache,
104    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
105        let mut builtin_table_updates = Vec::with_capacity(updates.len());
106        let updates = sort_updates(updates);
107
108        let mut groups: Vec<Vec<_>> = Vec::new();
109        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
110            groups.push(updates.collect());
111        }
112        for updates in groups {
113            let mut apply_state = BootstrapApplyState::Updates(Vec::new());
114            let mut retractions = InProgressRetractions::default();
115
116            for update in updates {
117                let next_apply_state = BootstrapApplyState::new(update);
118                let (next_apply_state, builtin_table_update) = apply_state
119                    .step(
120                        next_apply_state,
121                        self,
122                        &mut retractions,
123                        local_expression_cache,
124                    )
125                    .await;
126                apply_state = next_apply_state;
127                builtin_table_updates.extend(builtin_table_update);
128            }
129
130            let builtin_table_update = apply_state
132                .apply(self, &mut retractions, local_expression_cache)
133                .await;
134            builtin_table_updates.extend(builtin_table_update);
135        }
136        builtin_table_updates
137    }
138
139    #[instrument]
143    pub(crate) fn apply_updates(
144        &mut self,
145        updates: Vec<StateUpdate>,
146    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
147        let mut builtin_table_updates = Vec::with_capacity(updates.len());
148        let updates = sort_updates(updates);
149
150        for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
151            let mut retractions = InProgressRetractions::default();
152            let builtin_table_update = self.apply_updates_inner(
153                updates.collect(),
154                &mut retractions,
155                &mut LocalExpressionCache::Closed,
156            )?;
157            builtin_table_updates.extend(builtin_table_update);
158        }
159
160        Ok(builtin_table_updates)
161    }
162
163    #[instrument(level = "debug")]
164    fn apply_updates_inner(
165        &mut self,
166        updates: Vec<StateUpdate>,
167        retractions: &mut InProgressRetractions,
168        local_expression_cache: &mut LocalExpressionCache,
169    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
170        soft_assert_no_log!(
171            updates.iter().map(|update| update.ts).all_equal(),
172            "all timestamps should be equal: {updates:?}"
173        );
174
175        let mut update_system_config = false;
176
177        let mut builtin_table_updates = Vec::with_capacity(updates.len());
178        for StateUpdate { kind, ts: _, diff } in updates {
179            if matches!(kind, StateUpdateKind::SystemConfiguration(_)) {
180                update_system_config = true;
181            }
182
183            match diff {
184                StateDiff::Retraction => {
185                    builtin_table_updates
188                        .extend(self.generate_builtin_table_update(kind.clone(), diff));
189                    self.apply_update(kind, diff, retractions, local_expression_cache)?;
190                }
191                StateDiff::Addition => {
192                    self.apply_update(kind.clone(), diff, retractions, local_expression_cache)?;
193                    builtin_table_updates
196                        .extend(self.generate_builtin_table_update(kind.clone(), diff));
197                }
198            }
199        }
200
201        if update_system_config {
202            self.system_configuration.dyncfg_updates();
203        }
204
205        Ok(builtin_table_updates)
206    }
207
208    #[instrument(level = "debug")]
209    fn apply_update(
210        &mut self,
211        kind: StateUpdateKind,
212        diff: StateDiff,
213        retractions: &mut InProgressRetractions,
214        local_expression_cache: &mut LocalExpressionCache,
215    ) -> Result<(), CatalogError> {
216        match kind {
217            StateUpdateKind::Role(role) => {
218                self.apply_role_update(role, diff, retractions);
219            }
220            StateUpdateKind::RoleAuth(role_auth) => {
221                self.apply_role_auth_update(role_auth, diff, retractions);
222            }
223            StateUpdateKind::Database(database) => {
224                self.apply_database_update(database, diff, retractions);
225            }
226            StateUpdateKind::Schema(schema) => {
227                self.apply_schema_update(schema, diff, retractions);
228            }
229            StateUpdateKind::DefaultPrivilege(default_privilege) => {
230                self.apply_default_privilege_update(default_privilege, diff, retractions);
231            }
232            StateUpdateKind::SystemPrivilege(system_privilege) => {
233                self.apply_system_privilege_update(system_privilege, diff, retractions);
234            }
235            StateUpdateKind::SystemConfiguration(system_configuration) => {
236                self.apply_system_configuration_update(system_configuration, diff, retractions);
237            }
238            StateUpdateKind::Cluster(cluster) => {
239                self.apply_cluster_update(cluster, diff, retractions);
240            }
241            StateUpdateKind::NetworkPolicy(network_policy) => {
242                self.apply_network_policy_update(network_policy, diff, retractions);
243            }
244            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
245                self.apply_introspection_source_index_update(
246                    introspection_source_index,
247                    diff,
248                    retractions,
249                );
250            }
251            StateUpdateKind::ClusterReplica(cluster_replica) => {
252                self.apply_cluster_replica_update(cluster_replica, diff, retractions);
253            }
254            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
255                self.apply_system_object_mapping_update(
256                    system_object_mapping,
257                    diff,
258                    retractions,
259                    local_expression_cache,
260                );
261            }
262            StateUpdateKind::TemporaryItem(item) => {
263                self.apply_temporary_item_update(item, diff, retractions);
264            }
265            StateUpdateKind::Item(item) => {
266                self.apply_item_update(item, diff, retractions, local_expression_cache)?;
267            }
268            StateUpdateKind::Comment(comment) => {
269                self.apply_comment_update(comment, diff, retractions);
270            }
271            StateUpdateKind::SourceReferences(source_reference) => {
272                self.apply_source_references_update(source_reference, diff, retractions);
273            }
274            StateUpdateKind::AuditLog(_audit_log) => {
275                }
277            StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
278                self.apply_storage_collection_metadata_update(
279                    storage_collection_metadata,
280                    diff,
281                    retractions,
282                );
283            }
284            StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
285                self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
286            }
287        }
288
289        Ok(())
290    }
291
292    #[instrument(level = "debug")]
293    fn apply_role_auth_update(
294        &mut self,
295        role_auth: mz_catalog::durable::RoleAuth,
296        diff: StateDiff,
297        retractions: &mut InProgressRetractions,
298    ) {
299        apply_with_update(
300            &mut self.role_auth_by_id,
301            role_auth,
302            |role_auth| role_auth.role_id,
303            diff,
304            &mut retractions.role_auths,
305        );
306    }
307
308    #[instrument(level = "debug")]
309    fn apply_role_update(
310        &mut self,
311        role: mz_catalog::durable::Role,
312        diff: StateDiff,
313        retractions: &mut InProgressRetractions,
314    ) {
315        apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
316        apply_with_update(
317            &mut self.roles_by_id,
318            role,
319            |role| role.id,
320            diff,
321            &mut retractions.roles,
322        );
323    }
324
325    #[instrument(level = "debug")]
326    fn apply_database_update(
327        &mut self,
328        database: mz_catalog::durable::Database,
329        diff: StateDiff,
330        retractions: &mut InProgressRetractions,
331    ) {
332        apply_inverted_lookup(
333            &mut self.database_by_name,
334            &database.name,
335            database.id,
336            diff,
337        );
338        apply_with_update(
339            &mut self.database_by_id,
340            database,
341            |database| database.id,
342            diff,
343            &mut retractions.databases,
344        );
345    }
346
347    #[instrument(level = "debug")]
348    fn apply_schema_update(
349        &mut self,
350        schema: mz_catalog::durable::Schema,
351        diff: StateDiff,
352        retractions: &mut InProgressRetractions,
353    ) {
354        let (schemas_by_id, schemas_by_name) = match &schema.database_id {
355            Some(database_id) => {
356                let db = self
357                    .database_by_id
358                    .get_mut(database_id)
359                    .expect("catalog out of sync");
360                (&mut db.schemas_by_id, &mut db.schemas_by_name)
361            }
362            None => (
363                &mut self.ambient_schemas_by_id,
364                &mut self.ambient_schemas_by_name,
365            ),
366        };
367        apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
368        apply_with_update(
369            schemas_by_id,
370            schema,
371            |schema| schema.id,
372            diff,
373            &mut retractions.schemas,
374        );
375    }
376
377    #[instrument(level = "debug")]
378    fn apply_default_privilege_update(
379        &mut self,
380        default_privilege: mz_catalog::durable::DefaultPrivilege,
381        diff: StateDiff,
382        _retractions: &mut InProgressRetractions,
383    ) {
384        match diff {
385            StateDiff::Addition => self
386                .default_privileges
387                .grant(default_privilege.object, default_privilege.acl_item),
388            StateDiff::Retraction => self
389                .default_privileges
390                .revoke(&default_privilege.object, &default_privilege.acl_item),
391        }
392    }
393
394    #[instrument(level = "debug")]
395    fn apply_system_privilege_update(
396        &mut self,
397        system_privilege: MzAclItem,
398        diff: StateDiff,
399        _retractions: &mut InProgressRetractions,
400    ) {
401        match diff {
402            StateDiff::Addition => self.system_privileges.grant(system_privilege),
403            StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
404        }
405    }
406
407    #[instrument(level = "debug")]
408    fn apply_system_configuration_update(
409        &mut self,
410        system_configuration: mz_catalog::durable::SystemConfiguration,
411        diff: StateDiff,
412        _retractions: &mut InProgressRetractions,
413    ) {
414        let res = match diff {
415            StateDiff::Addition => self.insert_system_configuration(
416                &system_configuration.name,
417                VarInput::Flat(&system_configuration.value),
418            ),
419            StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
420        };
421        match res {
422            Ok(_) => (),
423            Err(Error {
427                kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
428            }) => {
429                warn!(%name, "unknown system parameter from catalog storage");
430            }
431            Err(e) => panic!("unable to update system variable: {e:?}"),
432        }
433    }
434
435    #[instrument(level = "debug")]
436    fn apply_cluster_update(
437        &mut self,
438        cluster: mz_catalog::durable::Cluster,
439        diff: StateDiff,
440        retractions: &mut InProgressRetractions,
441    ) {
442        apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
443        apply_with_update(
444            &mut self.clusters_by_id,
445            cluster,
446            |cluster| cluster.id,
447            diff,
448            &mut retractions.clusters,
449        );
450    }
451
452    #[instrument(level = "debug")]
453    fn apply_network_policy_update(
454        &mut self,
455        policy: mz_catalog::durable::NetworkPolicy,
456        diff: StateDiff,
457        retractions: &mut InProgressRetractions,
458    ) {
459        apply_inverted_lookup(
460            &mut self.network_policies_by_name,
461            &policy.name,
462            policy.id,
463            diff,
464        );
465        apply_with_update(
466            &mut self.network_policies_by_id,
467            policy,
468            |policy| policy.id,
469            diff,
470            &mut retractions.network_policies,
471        );
472    }
473
474    #[instrument(level = "debug")]
475    fn apply_introspection_source_index_update(
476        &mut self,
477        introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
478        diff: StateDiff,
479        retractions: &mut InProgressRetractions,
480    ) {
481        let cluster = self
482            .clusters_by_id
483            .get_mut(&introspection_source_index.cluster_id)
484            .expect("catalog out of sync");
485        let log = BUILTIN_LOG_LOOKUP
486            .get(introspection_source_index.name.as_str())
487            .expect("missing log");
488        apply_inverted_lookup(
489            &mut cluster.log_indexes,
490            &log.variant,
491            introspection_source_index.index_id,
492            diff,
493        );
494
495        match diff {
496            StateDiff::Addition => {
497                if let Some(mut entry) = retractions
498                    .introspection_source_indexes
499                    .remove(&introspection_source_index.item_id)
500                {
501                    let (index_name, index) = self.create_introspection_source_index(
504                        introspection_source_index.cluster_id,
505                        log,
506                        introspection_source_index.index_id,
507                    );
508                    assert_eq!(entry.id, introspection_source_index.item_id);
509                    assert_eq!(entry.oid, introspection_source_index.oid);
510                    assert_eq!(entry.name, index_name);
511                    entry.item = index;
512                    self.insert_entry(entry);
513                } else {
514                    self.insert_introspection_source_index(
515                        introspection_source_index.cluster_id,
516                        log,
517                        introspection_source_index.item_id,
518                        introspection_source_index.index_id,
519                        introspection_source_index.oid,
520                    );
521                }
522            }
523            StateDiff::Retraction => {
524                let entry = self.drop_item(introspection_source_index.item_id);
525                retractions
526                    .introspection_source_indexes
527                    .insert(entry.id, entry);
528            }
529        }
530    }
531
532    #[instrument(level = "debug")]
533    fn apply_cluster_replica_update(
534        &mut self,
535        cluster_replica: mz_catalog::durable::ClusterReplica,
536        diff: StateDiff,
537        _retractions: &mut InProgressRetractions,
538    ) {
539        let cluster = self
540            .clusters_by_id
541            .get(&cluster_replica.cluster_id)
542            .expect("catalog out of sync");
543        let azs = cluster.availability_zones();
544        let location = self
545            .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
546            .expect("catalog in unexpected state");
547        let cluster = self
548            .clusters_by_id
549            .get_mut(&cluster_replica.cluster_id)
550            .expect("catalog out of sync");
551        apply_inverted_lookup(
552            &mut cluster.replica_id_by_name_,
553            &cluster_replica.name,
554            cluster_replica.replica_id,
555            diff,
556        );
557        match diff {
558            StateDiff::Retraction => {
559                let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
560                assert!(
561                    prev.is_some(),
562                    "retraction does not match existing value: {:?}",
563                    cluster_replica.replica_id
564                );
565            }
566            StateDiff::Addition => {
567                let logging = ReplicaLogging {
568                    log_logging: cluster_replica.config.logging.log_logging,
569                    interval: cluster_replica.config.logging.interval,
570                };
571                let config = ReplicaConfig {
572                    location,
573                    compute: ComputeReplicaConfig { logging },
574                };
575                let mem_cluster_replica = ClusterReplica {
576                    name: cluster_replica.name.clone(),
577                    cluster_id: cluster_replica.cluster_id,
578                    replica_id: cluster_replica.replica_id,
579                    config,
580                    owner_id: cluster_replica.owner_id,
581                };
582                let prev = cluster
583                    .replicas_by_id_
584                    .insert(cluster_replica.replica_id, mem_cluster_replica);
585                assert_eq!(
586                    prev, None,
587                    "values must be explicitly retracted before inserting a new value: {:?}",
588                    cluster_replica.replica_id
589                );
590            }
591        }
592    }
593
594    #[instrument(level = "debug")]
595    fn apply_system_object_mapping_update(
596        &mut self,
597        system_object_mapping: mz_catalog::durable::SystemObjectMapping,
598        diff: StateDiff,
599        retractions: &mut InProgressRetractions,
600        local_expression_cache: &mut LocalExpressionCache,
601    ) {
602        let item_id = system_object_mapping.unique_identifier.catalog_id;
603        let global_id = system_object_mapping.unique_identifier.global_id;
604
605        if system_object_mapping.unique_identifier.runtime_alterable() {
606            return;
610        }
611
612        if let StateDiff::Retraction = diff {
613            let entry = self.drop_item(item_id);
614            retractions.system_object_mappings.insert(item_id, entry);
615            return;
616        }
617
618        if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
619            self.insert_entry(entry);
624            return;
625        }
626
627        let builtin = BUILTIN_LOOKUP
628            .get(&system_object_mapping.description)
629            .expect("missing builtin")
630            .1;
631        let schema_name = builtin.schema();
632        let schema_id = self
633            .ambient_schemas_by_name
634            .get(schema_name)
635            .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
636        let name = QualifiedItemName {
637            qualifiers: ItemQualifiers {
638                database_spec: ResolvedDatabaseSpecifier::Ambient,
639                schema_spec: SchemaSpecifier::Id(*schema_id),
640            },
641            item: builtin.name().into(),
642        };
643        match builtin {
644            Builtin::Log(log) => {
645                let mut acl_items = vec![rbac::owner_privilege(
646                    mz_sql::catalog::ObjectType::Source,
647                    MZ_SYSTEM_ROLE_ID,
648                )];
649                acl_items.extend_from_slice(&log.access);
650                self.insert_item(
651                    item_id,
652                    log.oid,
653                    name.clone(),
654                    CatalogItem::Log(Log {
655                        variant: log.variant,
656                        global_id,
657                    }),
658                    MZ_SYSTEM_ROLE_ID,
659                    PrivilegeMap::from_mz_acl_items(acl_items),
660                );
661            }
662
663            Builtin::Table(table) => {
664                let mut acl_items = vec![rbac::owner_privilege(
665                    mz_sql::catalog::ObjectType::Table,
666                    MZ_SYSTEM_ROLE_ID,
667                )];
668                acl_items.extend_from_slice(&table.access);
669
670                self.insert_item(
671                    item_id,
672                    table.oid,
673                    name.clone(),
674                    CatalogItem::Table(Table {
675                        create_sql: None,
676                        desc: VersionedRelationDesc::new(table.desc.clone()),
677                        collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
678                        conn_id: None,
679                        resolved_ids: ResolvedIds::empty(),
680                        custom_logical_compaction_window: table.is_retained_metrics_object.then(
681                            || {
682                                self.system_config()
683                                    .metrics_retention()
684                                    .try_into()
685                                    .expect("invalid metrics retention")
686                            },
687                        ),
688                        is_retained_metrics_object: table.is_retained_metrics_object,
689                        data_source: TableDataSource::TableWrites {
690                            defaults: vec![Expr::null(); table.desc.arity()],
691                        },
692                    }),
693                    MZ_SYSTEM_ROLE_ID,
694                    PrivilegeMap::from_mz_acl_items(acl_items),
695                );
696            }
697            Builtin::Index(index) => {
698                let custom_logical_compaction_window =
699                    index.is_retained_metrics_object.then(|| {
700                        self.system_config()
701                            .metrics_retention()
702                            .try_into()
703                            .expect("invalid metrics retention")
704                    });
705                let versions = BTreeMap::new();
707
708                let item = self
709                    .parse_item(
710                        global_id,
711                        &index.create_sql(),
712                        &versions,
713                        None,
714                        index.is_retained_metrics_object,
715                        custom_logical_compaction_window,
716                        local_expression_cache,
717                        None,
718                    )
719                    .unwrap_or_else(|e| {
720                        panic!(
721                            "internal error: failed to load bootstrap index:\n\
722                                    {}\n\
723                                    error:\n\
724                                    {:?}\n\n\
725                                    make sure that the schema name is specified in the builtin index's create sql statement.",
726                            index.name, e
727                        )
728                    });
729                let CatalogItem::Index(_) = item else {
730                    panic!(
731                        "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
732                        index.name
733                    );
734                };
735
736                self.insert_item(
737                    item_id,
738                    index.oid,
739                    name,
740                    item,
741                    MZ_SYSTEM_ROLE_ID,
742                    PrivilegeMap::default(),
743                );
744            }
745            Builtin::View(_) => {
746                unreachable!("views added elsewhere");
748            }
749
750            Builtin::Type(typ) => {
752                let typ = self.resolve_builtin_type_references(typ);
753                if let CatalogType::Array { element_reference } = typ.details.typ {
754                    let entry = self.get_entry_mut(&element_reference);
755                    let item_type = match &mut entry.item {
756                        CatalogItem::Type(item_type) => item_type,
757                        _ => unreachable!("types can only reference other types"),
758                    };
759                    item_type.details.array_id = Some(item_id);
760                }
761
762                let desc = None;
766                assert!(!matches!(typ.details.typ, CatalogType::Record { .. }));
767                let schema_id = self.resolve_system_schema(typ.schema);
768
769                self.insert_item(
770                    item_id,
771                    typ.oid,
772                    QualifiedItemName {
773                        qualifiers: ItemQualifiers {
774                            database_spec: ResolvedDatabaseSpecifier::Ambient,
775                            schema_spec: SchemaSpecifier::Id(schema_id),
776                        },
777                        item: typ.name.to_owned(),
778                    },
779                    CatalogItem::Type(Type {
780                        create_sql: None,
781                        global_id,
782                        details: typ.details.clone(),
783                        desc,
784                        resolved_ids: ResolvedIds::empty(),
785                    }),
786                    MZ_SYSTEM_ROLE_ID,
787                    PrivilegeMap::from_mz_acl_items(vec![
788                        rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
789                        rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
790                    ]),
791                );
792            }
793
794            Builtin::Func(func) => {
795                let oid = INVALID_OID;
799                self.insert_item(
800                    item_id,
801                    oid,
802                    name.clone(),
803                    CatalogItem::Func(Func {
804                        inner: func.inner,
805                        global_id,
806                    }),
807                    MZ_SYSTEM_ROLE_ID,
808                    PrivilegeMap::default(),
809                );
810            }
811
812            Builtin::Source(coll) => {
813                let mut acl_items = vec![rbac::owner_privilege(
814                    mz_sql::catalog::ObjectType::Source,
815                    MZ_SYSTEM_ROLE_ID,
816                )];
817                acl_items.extend_from_slice(&coll.access);
818
819                self.insert_item(
820                    item_id,
821                    coll.oid,
822                    name.clone(),
823                    CatalogItem::Source(Source {
824                        create_sql: None,
825                        data_source: DataSourceDesc::Introspection(coll.data_source),
826                        desc: coll.desc.clone(),
827                        global_id,
828                        timeline: Timeline::EpochMilliseconds,
829                        resolved_ids: ResolvedIds::empty(),
830                        custom_logical_compaction_window: coll.is_retained_metrics_object.then(
831                            || {
832                                self.system_config()
833                                    .metrics_retention()
834                                    .try_into()
835                                    .expect("invalid metrics retention")
836                            },
837                        ),
838                        is_retained_metrics_object: coll.is_retained_metrics_object,
839                    }),
840                    MZ_SYSTEM_ROLE_ID,
841                    PrivilegeMap::from_mz_acl_items(acl_items),
842                );
843            }
844            Builtin::ContinualTask(ct) => {
845                let mut acl_items = vec![rbac::owner_privilege(
846                    mz_sql::catalog::ObjectType::Source,
847                    MZ_SYSTEM_ROLE_ID,
848                )];
849                acl_items.extend_from_slice(&ct.access);
850                let versions = BTreeMap::new();
852
853                let item = self
854                    .parse_item(
855                        global_id,
856                        &ct.create_sql(),
857                        &versions,
858                        None,
859                        false,
860                        None,
861                        local_expression_cache,
862                        None,
863                    )
864                    .unwrap_or_else(|e| {
865                        panic!(
866                            "internal error: failed to load bootstrap continual task:\n\
867                                    {}\n\
868                                    error:\n\
869                                    {:?}\n\n\
870                                    make sure that the schema name is specified in the builtin continual task's create sql statement.",
871                            ct.name, e
872                        )
873                    });
874                let CatalogItem::ContinualTask(_) = &item else {
875                    panic!(
876                        "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
877                        ct.name
878                    );
879                };
880
881                self.insert_item(
882                    item_id,
883                    ct.oid,
884                    name,
885                    item,
886                    MZ_SYSTEM_ROLE_ID,
887                    PrivilegeMap::from_mz_acl_items(acl_items),
888                );
889            }
890            Builtin::Connection(connection) => {
891                let versions = BTreeMap::new();
893                let mut item = self
894                    .parse_item(
895                        global_id,
896                        connection.sql,
897                        &versions,
898                        None,
899                        false,
900                        None,
901                        local_expression_cache,
902                        None,
903                    )
904                    .unwrap_or_else(|e| {
905                        panic!(
906                            "internal error: failed to load bootstrap connection:\n\
907                                    {}\n\
908                                    error:\n\
909                                    {:?}\n\n\
910                                    make sure that the schema name is specified in the builtin connection's create sql statement.",
911                            connection.name, e
912                        )
913                    });
914                let CatalogItem::Connection(_) = &mut item else {
915                    panic!(
916                        "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
917                        connection.name
918                    );
919                };
920
921                let mut acl_items = vec![rbac::owner_privilege(
922                    mz_sql::catalog::ObjectType::Connection,
923                    connection.owner_id.clone(),
924                )];
925                acl_items.extend_from_slice(connection.access);
926
927                self.insert_item(
928                    item_id,
929                    connection.oid,
930                    name.clone(),
931                    item,
932                    connection.owner_id.clone(),
933                    PrivilegeMap::from_mz_acl_items(acl_items),
934                );
935            }
936        }
937    }
938
939    #[instrument(level = "debug")]
940    fn apply_temporary_item_update(
941        &mut self,
942        TemporaryItem {
943            id,
944            oid,
945            name,
946            item,
947            owner_id,
948            privileges,
949        }: TemporaryItem,
950        diff: StateDiff,
951        retractions: &mut InProgressRetractions,
952    ) {
953        match diff {
954            StateDiff::Addition => {
955                let entry = match retractions.temp_items.remove(&id) {
956                    Some(mut retraction) => {
957                        assert_eq!(retraction.id, id);
958                        retraction.item = item;
959                        retraction.id = id;
960                        retraction.oid = oid;
961                        retraction.name = name;
962                        retraction.owner_id = owner_id;
963                        retraction.privileges = privileges;
964                        retraction
965                    }
966                    None => CatalogEntry {
967                        item,
968                        referenced_by: Vec::new(),
969                        used_by: Vec::new(),
970                        id,
971                        oid,
972                        name,
973                        owner_id,
974                        privileges,
975                    },
976                };
977                self.insert_entry(entry);
978            }
979            StateDiff::Retraction => {
980                let entry = self.drop_item(id);
981                retractions.temp_items.insert(id, entry);
982            }
983        }
984    }
985
986    #[instrument(level = "debug")]
987    fn apply_item_update(
988        &mut self,
989        item: mz_catalog::durable::Item,
990        diff: StateDiff,
991        retractions: &mut InProgressRetractions,
992        local_expression_cache: &mut LocalExpressionCache,
993    ) -> Result<(), CatalogError> {
994        match diff {
995            StateDiff::Addition => {
996                let key = item.key();
997                let mz_catalog::durable::Item {
998                    id,
999                    oid,
1000                    global_id,
1001                    schema_id,
1002                    name,
1003                    create_sql,
1004                    owner_id,
1005                    privileges,
1006                    extra_versions,
1007                } = item;
1008                let schema = self.find_non_temp_schema(&schema_id);
1009                let name = QualifiedItemName {
1010                    qualifiers: ItemQualifiers {
1011                        database_spec: schema.database().clone(),
1012                        schema_spec: schema.id().clone(),
1013                    },
1014                    item: name.clone(),
1015                };
1016                let entry = match retractions.items.remove(&key) {
1017                    Some(mut retraction) => {
1018                        assert_eq!(retraction.id, item.id);
1019                        if retraction.create_sql() != create_sql {
1024                            let item = self
1025                                .deserialize_item(
1026                                    global_id,
1027                                    &create_sql,
1028                                    &extra_versions,
1029                                    local_expression_cache,
1030                                    Some(retraction.item),
1031                                )
1032                                .unwrap_or_else(|e| {
1033                                    panic!("{e:?}: invalid persisted SQL: {create_sql}")
1034                                });
1035                            retraction.item = item;
1036                        }
1037                        retraction.id = id;
1038                        retraction.oid = oid;
1039                        retraction.name = name;
1040                        retraction.owner_id = owner_id;
1041                        retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1042
1043                        retraction
1044                    }
1045                    None => {
1046                        let catalog_item = self
1047                            .deserialize_item(
1048                                global_id,
1049                                &create_sql,
1050                                &extra_versions,
1051                                local_expression_cache,
1052                                None,
1053                            )
1054                            .unwrap_or_else(|e| {
1055                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
1056                            });
1057                        CatalogEntry {
1058                            item: catalog_item,
1059                            referenced_by: Vec::new(),
1060                            used_by: Vec::new(),
1061                            id,
1062                            oid,
1063                            name,
1064                            owner_id,
1065                            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1066                        }
1067                    }
1068                };
1069
1070                self.insert_entry(entry);
1071            }
1072            StateDiff::Retraction => {
1073                let entry = self.drop_item(item.id);
1074                let key = item.into_key_value().0;
1075                retractions.items.insert(key, entry);
1076            }
1077        }
1078        Ok(())
1079    }
1080
1081    #[instrument(level = "debug")]
1082    fn apply_comment_update(
1083        &mut self,
1084        comment: mz_catalog::durable::Comment,
1085        diff: StateDiff,
1086        _retractions: &mut InProgressRetractions,
1087    ) {
1088        match diff {
1089            StateDiff::Addition => {
1090                let prev = self.comments.update_comment(
1091                    comment.object_id,
1092                    comment.sub_component,
1093                    Some(comment.comment),
1094                );
1095                assert_eq!(
1096                    prev, None,
1097                    "values must be explicitly retracted before inserting a new value"
1098                );
1099            }
1100            StateDiff::Retraction => {
1101                let prev =
1102                    self.comments
1103                        .update_comment(comment.object_id, comment.sub_component, None);
1104                assert_eq!(
1105                    prev,
1106                    Some(comment.comment),
1107                    "retraction does not match existing value: ({:?}, {:?})",
1108                    comment.object_id,
1109                    comment.sub_component,
1110                );
1111            }
1112        }
1113    }
1114
1115    #[instrument(level = "debug")]
1116    fn apply_source_references_update(
1117        &mut self,
1118        source_references: mz_catalog::durable::SourceReferences,
1119        diff: StateDiff,
1120        _retractions: &mut InProgressRetractions,
1121    ) {
1122        match diff {
1123            StateDiff::Addition => {
1124                let prev = self
1125                    .source_references
1126                    .insert(source_references.source_id, source_references.into());
1127                assert!(
1128                    prev.is_none(),
1129                    "values must be explicitly retracted before inserting a new value: {prev:?}"
1130                );
1131            }
1132            StateDiff::Retraction => {
1133                let prev = self.source_references.remove(&source_references.source_id);
1134                assert!(
1135                    prev.is_some(),
1136                    "retraction for a non-existent existing value: {source_references:?}"
1137                );
1138            }
1139        }
1140    }
1141
1142    #[instrument(level = "debug")]
1143    fn apply_storage_collection_metadata_update(
1144        &mut self,
1145        storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1146        diff: StateDiff,
1147        _retractions: &mut InProgressRetractions,
1148    ) {
1149        apply_inverted_lookup(
1150            &mut self.storage_metadata.collection_metadata,
1151            &storage_collection_metadata.id,
1152            storage_collection_metadata.shard,
1153            diff,
1154        );
1155    }
1156
1157    #[instrument(level = "debug")]
1158    fn apply_unfinalized_shard_update(
1159        &mut self,
1160        unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1161        diff: StateDiff,
1162        _retractions: &mut InProgressRetractions,
1163    ) {
1164        match diff {
1165            StateDiff::Addition => {
1166                let newly_inserted = self
1167                    .storage_metadata
1168                    .unfinalized_shards
1169                    .insert(unfinalized_shard.shard);
1170                assert!(
1171                    newly_inserted,
1172                    "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1173                );
1174            }
1175            StateDiff::Retraction => {
1176                let removed = self
1177                    .storage_metadata
1178                    .unfinalized_shards
1179                    .remove(&unfinalized_shard.shard);
1180                assert!(
1181                    removed,
1182                    "retraction does not match existing value: {unfinalized_shard:?}"
1183                );
1184            }
1185        }
1186    }
1187
1188    #[instrument]
1191    pub(crate) fn generate_builtin_table_updates(
1192        &self,
1193        updates: Vec<StateUpdate>,
1194    ) -> Vec<BuiltinTableUpdate> {
1195        let mut builtin_table_updates = Vec::new();
1196        for StateUpdate { kind, ts: _, diff } in updates {
1197            let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1198            let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1199            builtin_table_updates.extend(builtin_table_update);
1200        }
1201        builtin_table_updates
1202    }
1203
1204    #[instrument(level = "debug")]
1207    pub(crate) fn generate_builtin_table_update(
1208        &self,
1209        kind: StateUpdateKind,
1210        diff: StateDiff,
1211    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1212        let diff = diff.into();
1213        match kind {
1214            StateUpdateKind::Role(role) => {
1215                let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1216                for group_id in role.membership.map.keys() {
1217                    builtin_table_updates
1218                        .push(self.pack_role_members_update(*group_id, role.id, diff))
1219                }
1220                builtin_table_updates
1221            }
1222            StateUpdateKind::RoleAuth(role_auth) => {
1223                vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1224            }
1225            StateUpdateKind::Database(database) => {
1226                vec![self.pack_database_update(&database.id, diff)]
1227            }
1228            StateUpdateKind::Schema(schema) => {
1229                let db_spec = schema.database_id.into();
1230                vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1231            }
1232            StateUpdateKind::DefaultPrivilege(default_privilege) => {
1233                vec![self.pack_default_privileges_update(
1234                    &default_privilege.object,
1235                    &default_privilege.acl_item.grantee,
1236                    &default_privilege.acl_item.acl_mode,
1237                    diff,
1238                )]
1239            }
1240            StateUpdateKind::SystemPrivilege(system_privilege) => {
1241                vec![self.pack_system_privileges_update(system_privilege, diff)]
1242            }
1243            StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1244            StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1245            StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1246                self.pack_item_update(introspection_source_index.item_id, diff)
1247            }
1248            StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1249                cluster_replica.cluster_id,
1250                &cluster_replica.name,
1251                diff,
1252            ),
1253            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1254                if !system_object_mapping.unique_identifier.runtime_alterable() {
1258                    self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1259                } else {
1260                    vec![]
1261                }
1262            }
1263            StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1264            StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1265            StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1266                comment.object_id,
1267                comment.sub_component,
1268                &comment.comment,
1269                diff,
1270            )],
1271            StateUpdateKind::SourceReferences(source_references) => {
1272                self.pack_source_references_update(&source_references, diff)
1273            }
1274            StateUpdateKind::AuditLog(audit_log) => {
1275                vec![
1276                    self.pack_audit_log_update(&audit_log.event, diff)
1277                        .expect("could not pack audit log update"),
1278                ]
1279            }
1280            StateUpdateKind::NetworkPolicy(policy) => self
1281                .pack_network_policy_update(&policy.id, diff)
1282                .expect("could not pack audit log update"),
1283            StateUpdateKind::StorageCollectionMetadata(_)
1284            | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1285        }
1286    }
1287
1288    fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1289        self.entry_by_id
1290            .get_mut(id)
1291            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1292    }
1293
1294    fn get_schema_mut(
1295        &mut self,
1296        database_spec: &ResolvedDatabaseSpecifier,
1297        schema_spec: &SchemaSpecifier,
1298        conn_id: &ConnectionId,
1299    ) -> &mut Schema {
1300        match (database_spec, schema_spec) {
1302            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1303                .temporary_schemas
1304                .get_mut(conn_id)
1305                .expect("catalog out of sync"),
1306            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1307                .ambient_schemas_by_id
1308                .get_mut(id)
1309                .expect("catalog out of sync"),
1310            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1311                .database_by_id
1312                .get_mut(database_id)
1313                .expect("catalog out of sync")
1314                .schemas_by_id
1315                .get_mut(schema_id)
1316                .expect("catalog out of sync"),
1317            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1318                unreachable!("temporary schemas are in the ambient database")
1319            }
1320        }
1321    }
1322
1323    #[instrument(name = "catalog::parse_views")]
1333    async fn parse_builtin_views(
1334        state: &mut CatalogState,
1335        builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1336        retractions: &mut InProgressRetractions,
1337        local_expression_cache: &mut LocalExpressionCache,
1338    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1339        let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1340        let (updates, additions): (Vec<_>, Vec<_>) =
1341            builtin_views
1342                .into_iter()
1343                .partition_map(|(view, item_id, gid)| {
1344                    match retractions.system_object_mappings.remove(&item_id) {
1345                        Some(entry) => Either::Left(entry),
1346                        None => Either::Right((view, item_id, gid)),
1347                    }
1348                });
1349
1350        for entry in updates {
1351            let item_id = entry.id();
1356            state.insert_entry(entry);
1357            builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1358        }
1359
1360        let mut handles = Vec::new();
1361        let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1362            BTreeMap::new();
1363        let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1364        let mut awaiting_all = Vec::new();
1367        let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1369        let mut completed_names: BTreeSet<String> = BTreeSet::new();
1370
1371        let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1373            .into_iter()
1374            .map(|(view, item_id, gid)| (item_id, (view, gid)))
1375            .collect();
1376        let item_ids: Vec<_> = views.keys().copied().collect();
1377
1378        let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1379        while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1380            if handles.is_empty() && ready.is_empty() {
1381                ready.extend(awaiting_all.drain(..));
1383            }
1384
1385            if !ready.is_empty() {
1387                let spawn_state = Arc::new(state.clone());
1388                while let Some(id) = ready.pop_front() {
1389                    let (view, global_id) = views.get(&id).expect("must exist");
1390                    let global_id = *global_id;
1391                    let create_sql = view.create_sql();
1392                    let versions = BTreeMap::new();
1394
1395                    let span = info_span!(parent: None, "parse builtin view", name = view.name);
1396                    OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1397                    let task_state = Arc::clone(&spawn_state);
1398                    let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1399                    let handle = mz_ore::task::spawn(
1400                        || "parse view",
1401                        async move {
1402                            let res = task_state.parse_item_inner(
1403                                global_id,
1404                                &create_sql,
1405                                &versions,
1406                                None,
1407                                false,
1408                                None,
1409                                cached_expr,
1410                                None,
1411                            );
1412                            (id, global_id, res)
1413                        }
1414                        .instrument(span),
1415                    );
1416                    handles.push(handle);
1417                }
1418            }
1419
1420            let (handle, _idx, remaining) = future::select_all(handles).await;
1422            handles = remaining;
1423            let (id, global_id, res) = handle.expect("must join");
1424            let mut insert_cached_expr = |cached_expr| {
1425                if let Some(cached_expr) = cached_expr {
1426                    local_expression_cache.insert_cached_expression(global_id, cached_expr);
1427                }
1428            };
1429            match res {
1430                Ok((item, uncached_expr)) => {
1431                    if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1432                        local_expression_cache.insert_uncached_expression(
1433                            global_id,
1434                            uncached_expr,
1435                            optimizer_features,
1436                        );
1437                    }
1438                    let (view, _gid) = views.remove(&id).expect("must exist");
1440                    let schema_id = state
1441                        .ambient_schemas_by_name
1442                        .get(view.schema)
1443                        .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1444                    let qname = QualifiedItemName {
1445                        qualifiers: ItemQualifiers {
1446                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1447                            schema_spec: SchemaSpecifier::Id(*schema_id),
1448                        },
1449                        item: view.name.into(),
1450                    };
1451                    let mut acl_items = vec![rbac::owner_privilege(
1452                        mz_sql::catalog::ObjectType::View,
1453                        MZ_SYSTEM_ROLE_ID,
1454                    )];
1455                    acl_items.extend_from_slice(&view.access);
1456
1457                    state.insert_item(
1458                        id,
1459                        view.oid,
1460                        qname,
1461                        item,
1462                        MZ_SYSTEM_ROLE_ID,
1463                        PrivilegeMap::from_mz_acl_items(acl_items),
1464                    );
1465
1466                    let mut resolved_dependent_items = Vec::new();
1468                    if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1469                        resolved_dependent_items.extend(dependent_items);
1470                    }
1471                    let entry = state.get_entry(&id);
1472                    let full_name = state.resolve_full_name(entry.name(), None).to_string();
1473                    if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1474                        resolved_dependent_items.extend(dependent_items);
1475                    }
1476                    ready.extend(resolved_dependent_items);
1477
1478                    completed_ids.insert(id);
1479                    completed_names.insert(full_name);
1480                }
1481                Err((
1483                    AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1484                    cached_expr,
1485                )) => {
1486                    insert_cached_expr(cached_expr);
1487                    if completed_ids.contains(&missing_dep) {
1488                        ready.push_back(id);
1489                    } else {
1490                        awaiting_id_dependencies
1491                            .entry(missing_dep)
1492                            .or_default()
1493                            .push(id);
1494                    }
1495                }
1496                Err((
1498                    AdapterError::PlanError(plan::PlanError::Catalog(
1499                        SqlCatalogError::UnknownItem(missing_dep),
1500                    )),
1501                    cached_expr,
1502                )) => {
1503                    insert_cached_expr(cached_expr);
1504                    match CatalogItemId::from_str(&missing_dep) {
1505                        Ok(missing_dep) => {
1506                            if completed_ids.contains(&missing_dep) {
1507                                ready.push_back(id);
1508                            } else {
1509                                awaiting_id_dependencies
1510                                    .entry(missing_dep)
1511                                    .or_default()
1512                                    .push(id);
1513                            }
1514                        }
1515                        Err(_) => {
1516                            if completed_names.contains(&missing_dep) {
1517                                ready.push_back(id);
1518                            } else {
1519                                awaiting_name_dependencies
1520                                    .entry(missing_dep)
1521                                    .or_default()
1522                                    .push(id);
1523                            }
1524                        }
1525                    }
1526                }
1527                Err((
1528                    AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1529                    cached_expr,
1530                )) => {
1531                    insert_cached_expr(cached_expr);
1532                    awaiting_all.push(id);
1533                }
1534                Err((e, _)) => {
1535                    let (bad_view, _gid) = views.get(&id).expect("must exist");
1536                    panic!(
1537                        "internal error: failed to load bootstrap view:\n\
1538                            {name}\n\
1539                            error:\n\
1540                            {e:?}\n\n\
1541                            Make sure that the schema name is specified in the builtin view's create sql statement.
1542                            ",
1543                        name = bad_view.name,
1544                    )
1545                }
1546            }
1547        }
1548
1549        assert!(awaiting_id_dependencies.is_empty());
1550        assert!(
1551            awaiting_name_dependencies.is_empty(),
1552            "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1553        );
1554        assert!(awaiting_all.is_empty());
1555        assert!(views.is_empty());
1556
1557        builtin_table_updates.extend(
1559            item_ids
1560                .into_iter()
1561                .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1562        );
1563
1564        builtin_table_updates
1565    }
1566
1567    fn insert_entry(&mut self, entry: CatalogEntry) {
1569        if !entry.id.is_system() {
1570            if let Some(cluster_id) = entry.item.cluster_id() {
1571                self.clusters_by_id
1572                    .get_mut(&cluster_id)
1573                    .expect("catalog out of sync")
1574                    .bound_objects
1575                    .insert(entry.id);
1576            };
1577        }
1578
1579        for u in entry.references().items() {
1580            match self.entry_by_id.get_mut(u) {
1581                Some(metadata) => metadata.referenced_by.push(entry.id()),
1582                None => panic!(
1583                    "Catalog: missing dependent catalog item {} while installing {}",
1584                    &u,
1585                    self.resolve_full_name(entry.name(), entry.conn_id())
1586                ),
1587            }
1588        }
1589        for u in entry.uses() {
1590            if u == entry.id() {
1593                continue;
1594            }
1595            match self.entry_by_id.get_mut(&u) {
1596                Some(metadata) => metadata.used_by.push(entry.id()),
1597                None => panic!(
1598                    "Catalog: missing dependent catalog item {} while installing {}",
1599                    &u,
1600                    self.resolve_full_name(entry.name(), entry.conn_id())
1601                ),
1602            }
1603        }
1604        for gid in entry.item.global_ids() {
1605            self.entry_by_global_id.insert(gid, entry.id());
1606        }
1607        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1608        let schema = self.get_schema_mut(
1609            &entry.name().qualifiers.database_spec,
1610            &entry.name().qualifiers.schema_spec,
1611            conn_id,
1612        );
1613
1614        let prev_id = match entry.item() {
1615            CatalogItem::Func(_) => schema
1616                .functions
1617                .insert(entry.name().item.clone(), entry.id()),
1618            CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1619            _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1620        };
1621
1622        assert!(
1623            prev_id.is_none(),
1624            "builtin name collision on {:?}",
1625            entry.name().item.clone()
1626        );
1627
1628        self.entry_by_id.insert(entry.id(), entry.clone());
1629    }
1630
1631    fn insert_item(
1633        &mut self,
1634        id: CatalogItemId,
1635        oid: u32,
1636        name: QualifiedItemName,
1637        item: CatalogItem,
1638        owner_id: RoleId,
1639        privileges: PrivilegeMap,
1640    ) {
1641        let entry = CatalogEntry {
1642            item,
1643            name,
1644            id,
1645            oid,
1646            used_by: Vec::new(),
1647            referenced_by: Vec::new(),
1648            owner_id,
1649            privileges,
1650        };
1651
1652        self.insert_entry(entry);
1653    }
1654
1655    #[mz_ore::instrument(level = "trace")]
1656    fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1657        let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1658        for u in metadata.references().items() {
1659            if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1660                dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1661            }
1662        }
1663        for u in metadata.uses() {
1664            if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1665                dep_metadata.used_by.retain(|u| *u != metadata.id())
1666            }
1667        }
1668        for gid in metadata.global_ids() {
1669            self.entry_by_global_id.remove(&gid);
1670        }
1671
1672        let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1673        let schema = self.get_schema_mut(
1674            &metadata.name().qualifiers.database_spec,
1675            &metadata.name().qualifiers.schema_spec,
1676            conn_id,
1677        );
1678        if metadata.item_type() == CatalogItemType::Type {
1679            schema
1680                .types
1681                .remove(&metadata.name().item)
1682                .expect("catalog out of sync");
1683        } else {
1684            assert_ne!(metadata.item_type(), CatalogItemType::Func);
1687
1688            schema
1689                .items
1690                .remove(&metadata.name().item)
1691                .expect("catalog out of sync");
1692        };
1693
1694        if !id.is_system() {
1695            if let Some(cluster_id) = metadata.item().cluster_id() {
1696                assert!(
1697                    self.clusters_by_id
1698                        .get_mut(&cluster_id)
1699                        .expect("catalog out of sync")
1700                        .bound_objects
1701                        .remove(&id),
1702                    "catalog out of sync"
1703                );
1704            }
1705        }
1706
1707        metadata
1708    }
1709
1710    fn insert_introspection_source_index(
1711        &mut self,
1712        cluster_id: ClusterId,
1713        log: &'static BuiltinLog,
1714        item_id: CatalogItemId,
1715        global_id: GlobalId,
1716        oid: u32,
1717    ) {
1718        let (index_name, index) =
1719            self.create_introspection_source_index(cluster_id, log, global_id);
1720        self.insert_item(
1721            item_id,
1722            oid,
1723            index_name,
1724            index,
1725            MZ_SYSTEM_ROLE_ID,
1726            PrivilegeMap::default(),
1727        );
1728    }
1729
1730    fn create_introspection_source_index(
1731        &self,
1732        cluster_id: ClusterId,
1733        log: &'static BuiltinLog,
1734        global_id: GlobalId,
1735    ) -> (QualifiedItemName, CatalogItem) {
1736        let source_name = FullItemName {
1737            database: RawDatabaseSpecifier::Ambient,
1738            schema: log.schema.into(),
1739            item: log.name.into(),
1740        };
1741        let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1742        let mut index_name = QualifiedItemName {
1743            qualifiers: ItemQualifiers {
1744                database_spec: ResolvedDatabaseSpecifier::Ambient,
1745                schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1746            },
1747            item: index_name.clone(),
1748        };
1749        index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1750        let index_item_name = index_name.item.clone();
1751        let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1752        let index = CatalogItem::Index(Index {
1753            global_id,
1754            on: log_global_id,
1755            keys: log
1756                .variant
1757                .index_by()
1758                .into_iter()
1759                .map(MirScalarExpr::column)
1760                .collect(),
1761            create_sql: index_sql(
1762                index_item_name,
1763                cluster_id,
1764                source_name,
1765                &log.variant.desc(),
1766                &log.variant.index_by(),
1767            ),
1768            conn_id: None,
1769            resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1770            cluster_id,
1771            is_retained_metrics_object: false,
1772            custom_logical_compaction_window: None,
1773        });
1774        (index_name, index)
1775    }
1776
1777    fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1782        Ok(self.system_configuration.set(name, value)?)
1783    }
1784
1785    fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1790        Ok(self.system_configuration.reset(name)?)
1791    }
1792}
1793
1794fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1796    let mut sorted_updates = Vec::with_capacity(updates.len());
1797
1798    updates.sort_by_key(|update| update.ts);
1799    for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
1800        let sorted_ts_updates = sort_updates_inner(updates.collect());
1801        sorted_updates.extend(sorted_ts_updates);
1802    }
1803
1804    sorted_updates
1805}
1806
1807fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1809    fn push_update<T>(
1810        update: T,
1811        diff: StateDiff,
1812        retractions: &mut Vec<T>,
1813        additions: &mut Vec<T>,
1814    ) {
1815        match diff {
1816            StateDiff::Retraction => retractions.push(update),
1817            StateDiff::Addition => additions.push(update),
1818        }
1819    }
1820
1821    soft_assert_no_log!(
1822        updates.iter().map(|update| update.ts).all_equal(),
1823        "all timestamps should be equal: {updates:?}"
1824    );
1825
1826    let mut pre_cluster_retractions = Vec::new();
1828    let mut pre_cluster_additions = Vec::new();
1829    let mut cluster_retractions = Vec::new();
1830    let mut cluster_additions = Vec::new();
1831    let mut builtin_item_updates = Vec::new();
1832    let mut item_retractions = Vec::new();
1833    let mut item_additions = Vec::new();
1834    let mut temp_item_retractions = Vec::new();
1835    let mut temp_item_additions = Vec::new();
1836    let mut post_item_retractions = Vec::new();
1837    let mut post_item_additions = Vec::new();
1838    for update in updates {
1839        let diff = update.diff.clone();
1840        match update.kind {
1841            StateUpdateKind::Role(_)
1842            | StateUpdateKind::RoleAuth(_)
1843            | StateUpdateKind::Database(_)
1844            | StateUpdateKind::Schema(_)
1845            | StateUpdateKind::DefaultPrivilege(_)
1846            | StateUpdateKind::SystemPrivilege(_)
1847            | StateUpdateKind::SystemConfiguration(_)
1848            | StateUpdateKind::NetworkPolicy(_) => push_update(
1849                update,
1850                diff,
1851                &mut pre_cluster_retractions,
1852                &mut pre_cluster_additions,
1853            ),
1854            StateUpdateKind::Cluster(_)
1855            | StateUpdateKind::IntrospectionSourceIndex(_)
1856            | StateUpdateKind::ClusterReplica(_) => push_update(
1857                update,
1858                diff,
1859                &mut cluster_retractions,
1860                &mut cluster_additions,
1861            ),
1862            StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1863                builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1864            }
1865            StateUpdateKind::TemporaryItem(item) => push_update(
1866                (item, update.ts, update.diff),
1867                diff,
1868                &mut temp_item_retractions,
1869                &mut temp_item_additions,
1870            ),
1871            StateUpdateKind::Item(item) => push_update(
1872                (item, update.ts, update.diff),
1873                diff,
1874                &mut item_retractions,
1875                &mut item_additions,
1876            ),
1877            StateUpdateKind::Comment(_)
1878            | StateUpdateKind::SourceReferences(_)
1879            | StateUpdateKind::AuditLog(_)
1880            | StateUpdateKind::StorageCollectionMetadata(_)
1881            | StateUpdateKind::UnfinalizedShard(_) => push_update(
1882                update,
1883                diff,
1884                &mut post_item_retractions,
1885                &mut post_item_additions,
1886            ),
1887        }
1888    }
1889
1890    let builtin_item_updates = builtin_item_updates
1892        .into_iter()
1893        .map(|(system_object_mapping, ts, diff)| {
1894            let idx = BUILTIN_LOOKUP
1895                .get(&system_object_mapping.description)
1896                .expect("missing builtin")
1897                .0;
1898            (idx, system_object_mapping, ts, diff)
1899        })
1900        .sorted_by_key(|(idx, _, _, _)| *idx)
1901        .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
1902
1903    let mut other_builtin_retractions = Vec::new();
1905    let mut other_builtin_additions = Vec::new();
1906    let mut builtin_index_retractions = Vec::new();
1907    let mut builtin_index_additions = Vec::new();
1908    for (builtin_item_update, ts, diff) in builtin_item_updates {
1909        match &builtin_item_update.description.object_type {
1910            CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
1911                StateUpdate {
1912                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1913                    ts,
1914                    diff,
1915                },
1916                diff,
1917                &mut builtin_index_retractions,
1918                &mut builtin_index_additions,
1919            ),
1920            CatalogItemType::Table
1921            | CatalogItemType::Source
1922            | CatalogItemType::Sink
1923            | CatalogItemType::View
1924            | CatalogItemType::MaterializedView
1925            | CatalogItemType::Type
1926            | CatalogItemType::Func
1927            | CatalogItemType::Secret
1928            | CatalogItemType::Connection => push_update(
1929                StateUpdate {
1930                    kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1931                    ts,
1932                    diff,
1933                },
1934                diff,
1935                &mut other_builtin_retractions,
1936                &mut other_builtin_additions,
1937            ),
1938        }
1939    }
1940
1941    fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
1948        let mut topo: BTreeMap<
1949            (mz_catalog::durable::Item, Timestamp, StateDiff),
1950            BTreeSet<CatalogItemId>,
1951        > = BTreeMap::default();
1952        let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
1953
1954        tracing::debug!(?connections, "sorting connections");
1956        for (connection, ts, diff) in connections.drain(..) {
1957            let statement = mz_sql::parse::parse(&connection.create_sql)
1958                .expect("valid CONNECTION create_sql")
1959                .into_element()
1960                .ast;
1961            let mut dependencies = mz_sql::names::dependencies(&statement)
1962                .expect("failed to find dependencies of CONNECTION");
1963            dependencies.remove(&connection.id);
1965            dependencies.retain(|dep| existing_connections.contains(dep));
1968
1969            assert_none!(topo.insert((connection, ts, diff), dependencies));
1971        }
1972        tracing::debug!(?topo, ?existing_connections, "built topological sort");
1973
1974        while !topo.is_empty() {
1976            let no_deps: Vec<_> = topo
1978                .iter()
1979                .filter_map(|(item, deps)| {
1980                    if deps.is_empty() {
1981                        Some(item.clone())
1982                    } else {
1983                        None
1984                    }
1985                })
1986                .collect();
1987
1988            if no_deps.is_empty() {
1990                panic!("programming error, cycle in Connections");
1991            }
1992
1993            for item in no_deps {
1995                topo.remove(&item);
1997                topo.values_mut().for_each(|deps| {
1999                    deps.remove(&item.0.id);
2000                });
2001                connections.push(item);
2003            }
2004        }
2005    }
2006
2007    fn sort_item_updates(
2023        item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2024    ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2025        let mut types = Vec::new();
2028        let mut funcs = Vec::new();
2031        let mut secrets = Vec::new();
2032        let mut connections = Vec::new();
2033        let mut sources = Vec::new();
2034        let mut tables = Vec::new();
2035        let mut derived_items = Vec::new();
2036        let mut sinks = Vec::new();
2037        let mut continual_tasks = Vec::new();
2038
2039        for update in item_updates {
2040            match update.0.item_type() {
2041                CatalogItemType::Type => types.push(update),
2042                CatalogItemType::Func => funcs.push(update),
2043                CatalogItemType::Secret => secrets.push(update),
2044                CatalogItemType::Connection => connections.push(update),
2045                CatalogItemType::Source => sources.push(update),
2046                CatalogItemType::Table => tables.push(update),
2047                CatalogItemType::View
2048                | CatalogItemType::MaterializedView
2049                | CatalogItemType::Index => derived_items.push(update),
2050                CatalogItemType::Sink => sinks.push(update),
2051                CatalogItemType::ContinualTask => continual_tasks.push(update),
2052            }
2053        }
2054
2055        for group in [
2057            &mut types,
2058            &mut funcs,
2059            &mut secrets,
2060            &mut sources,
2061            &mut tables,
2062            &mut derived_items,
2063            &mut sinks,
2064            &mut continual_tasks,
2065        ] {
2066            group.sort_by_key(|(item, _, _)| item.id);
2067        }
2068
2069        sort_connections(&mut connections);
2073
2074        iter::empty()
2075            .chain(types)
2076            .chain(funcs)
2077            .chain(secrets)
2078            .chain(connections)
2079            .chain(sources)
2080            .chain(tables)
2081            .chain(derived_items)
2082            .chain(sinks)
2083            .chain(continual_tasks)
2084            .collect()
2085    }
2086
2087    let item_retractions = sort_item_updates(item_retractions);
2088    let item_additions = sort_item_updates(item_additions);
2089
2090    fn sort_temp_item_updates(
2094        temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2095    ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2096        let mut types = Vec::new();
2099        let mut funcs = Vec::new();
2101        let mut secrets = Vec::new();
2102        let mut connections = Vec::new();
2103        let mut sources = Vec::new();
2104        let mut tables = Vec::new();
2105        let mut derived_items = Vec::new();
2106        let mut sinks = Vec::new();
2107        let mut continual_tasks = Vec::new();
2108
2109        for update in temp_item_updates {
2110            match update.0.item.typ() {
2111                CatalogItemType::Type => types.push(update),
2112                CatalogItemType::Func => funcs.push(update),
2113                CatalogItemType::Secret => secrets.push(update),
2114                CatalogItemType::Connection => connections.push(update),
2115                CatalogItemType::Source => sources.push(update),
2116                CatalogItemType::Table => tables.push(update),
2117                CatalogItemType::View
2118                | CatalogItemType::MaterializedView
2119                | CatalogItemType::Index => derived_items.push(update),
2120                CatalogItemType::Sink => sinks.push(update),
2121                CatalogItemType::ContinualTask => continual_tasks.push(update),
2122            }
2123        }
2124
2125        for group in [
2127            &mut types,
2128            &mut funcs,
2129            &mut secrets,
2130            &mut connections,
2131            &mut sources,
2132            &mut tables,
2133            &mut derived_items,
2134            &mut sinks,
2135            &mut continual_tasks,
2136        ] {
2137            group.sort_by_key(|(item, _, _)| item.id);
2138        }
2139
2140        iter::empty()
2141            .chain(types)
2142            .chain(funcs)
2143            .chain(secrets)
2144            .chain(connections)
2145            .chain(sources)
2146            .chain(tables)
2147            .chain(derived_items)
2148            .chain(sinks)
2149            .chain(continual_tasks)
2150            .collect()
2151    }
2152    let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2153    let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2154
2155    fn merge_item_updates(
2157        mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2158        mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2159    ) -> Vec<StateUpdate> {
2160        let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2161
2162        while let (Some((item, _, _)), Some((temp_item, _, _))) =
2163            (item_updates.front(), temp_item_updates.front())
2164        {
2165            if item.id < temp_item.id {
2166                let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2167                state_updates.push(StateUpdate {
2168                    kind: StateUpdateKind::Item(item),
2169                    ts,
2170                    diff,
2171                });
2172            } else if item.id > temp_item.id {
2173                let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2174                state_updates.push(StateUpdate {
2175                    kind: StateUpdateKind::TemporaryItem(temp_item),
2176                    ts,
2177                    diff,
2178                });
2179            } else {
2180                unreachable!(
2181                    "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2182                );
2183            }
2184        }
2185
2186        while let Some((item, ts, diff)) = item_updates.pop_front() {
2187            state_updates.push(StateUpdate {
2188                kind: StateUpdateKind::Item(item),
2189                ts,
2190                diff,
2191            });
2192        }
2193
2194        while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2195            state_updates.push(StateUpdate {
2196                kind: StateUpdateKind::TemporaryItem(temp_item),
2197                ts,
2198                diff,
2199            });
2200        }
2201
2202        state_updates
2203    }
2204    let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2205    let item_additions = merge_item_updates(item_additions, temp_item_additions);
2206
2207    iter::empty()
2209        .chain(post_item_retractions.into_iter().rev())
2211        .chain(item_retractions.into_iter().rev())
2212        .chain(builtin_index_retractions.into_iter().rev())
2213        .chain(cluster_retractions.into_iter().rev())
2214        .chain(other_builtin_retractions.into_iter().rev())
2215        .chain(pre_cluster_retractions.into_iter().rev())
2216        .chain(pre_cluster_additions)
2217        .chain(other_builtin_additions)
2218        .chain(cluster_additions)
2219        .chain(builtin_index_additions)
2220        .chain(item_additions)
2221        .chain(post_item_additions)
2222        .collect()
2223}
2224
2225enum BootstrapApplyState {
2229    BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2231    Items(Vec<StateUpdate>),
2233    Updates(Vec<StateUpdate>),
2235}
2236
2237impl BootstrapApplyState {
2238    fn new(update: StateUpdate) -> BootstrapApplyState {
2239        match update {
2240            StateUpdate {
2241                kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2242                diff: StateDiff::Addition,
2243                ..
2244            } if matches!(
2245                system_object_mapping.description.object_type,
2246                CatalogItemType::View
2247            ) =>
2248            {
2249                let view_addition = lookup_builtin_view_addition(system_object_mapping);
2250                BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2251            }
2252            StateUpdate {
2253                kind: StateUpdateKind::IntrospectionSourceIndex(_),
2254                ..
2255            }
2256            | StateUpdate {
2257                kind: StateUpdateKind::SystemObjectMapping(_),
2258                ..
2259            }
2260            | StateUpdate {
2261                kind: StateUpdateKind::Item(_),
2262                ..
2263            } => BootstrapApplyState::Items(vec![update]),
2264            update => BootstrapApplyState::Updates(vec![update]),
2265        }
2266    }
2267
2268    async fn apply(
2274        self,
2275        state: &mut CatalogState,
2276        retractions: &mut InProgressRetractions,
2277        local_expression_cache: &mut LocalExpressionCache,
2278    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2279        match self {
2280            BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2281                let restore = state.system_configuration.clone();
2282                state.system_configuration.enable_for_item_parsing();
2283                let builtin_table_updates = CatalogState::parse_builtin_views(
2284                    state,
2285                    builtin_view_additions,
2286                    retractions,
2287                    local_expression_cache,
2288                )
2289                .await;
2290                state.system_configuration = restore;
2291                builtin_table_updates
2292            }
2293            BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2294                state
2295                    .apply_updates_inner(updates, retractions, local_expression_cache)
2296                    .expect("corrupt catalog")
2297            }),
2298            BootstrapApplyState::Updates(updates) => state
2299                .apply_updates_inner(updates, retractions, local_expression_cache)
2300                .expect("corrupt catalog"),
2301        }
2302    }
2303
2304    async fn step(
2305        self,
2306        next: BootstrapApplyState,
2307        state: &mut CatalogState,
2308        retractions: &mut InProgressRetractions,
2309        local_expression_cache: &mut LocalExpressionCache,
2310    ) -> (
2311        BootstrapApplyState,
2312        Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2313    ) {
2314        match (self, next) {
2315            (
2316                BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2317                BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2318            ) => {
2319                builtin_view_additions.extend(next_builtin_view_additions);
2321                (
2322                    BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2323                    Vec::new(),
2324                )
2325            }
2326            (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2327                updates.extend(next_updates);
2329                (BootstrapApplyState::Items(updates), Vec::new())
2330            }
2331            (
2332                BootstrapApplyState::Updates(mut updates),
2333                BootstrapApplyState::Updates(next_updates),
2334            ) => {
2335                updates.extend(next_updates);
2337                (BootstrapApplyState::Updates(updates), Vec::new())
2338            }
2339            (apply_state, next_apply_state) => {
2340                let builtin_table_update = apply_state
2342                    .apply(state, retractions, local_expression_cache)
2343                    .await;
2344                (next_apply_state, builtin_table_update)
2345            }
2346        }
2347    }
2348}
2349
2350fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2355where
2356    K: Ord + Clone + Debug,
2357    V: PartialEq + Debug,
2358{
2359    match diff {
2360        StateDiff::Retraction => {
2361            let prev = map.remove(key);
2362            assert_eq!(
2363                prev,
2364                Some(value),
2365                "retraction does not match existing value: {key:?}"
2366            );
2367        }
2368        StateDiff::Addition => {
2369            let prev = map.insert(key.clone(), value);
2370            assert_eq!(
2371                prev, None,
2372                "values must be explicitly retracted before inserting a new value: {key:?}"
2373            );
2374        }
2375    }
2376}
2377
2378fn apply_with_update<K, V, D>(
2381    map: &mut BTreeMap<K, V>,
2382    durable: D,
2383    key_fn: impl FnOnce(&D) -> K,
2384    diff: StateDiff,
2385    retractions: &mut BTreeMap<D::Key, V>,
2386) where
2387    K: Ord,
2388    V: UpdateFrom<D> + PartialEq + Debug,
2389    D: DurableType,
2390    D::Key: Ord,
2391{
2392    match diff {
2393        StateDiff::Retraction => {
2394            let mem_key = key_fn(&durable);
2395            let value = map
2396                .remove(&mem_key)
2397                .expect("retraction does not match existing value: {key:?}");
2398            let durable_key = durable.into_key_value().0;
2399            retractions.insert(durable_key, value);
2400        }
2401        StateDiff::Addition => {
2402            let mem_key = key_fn(&durable);
2403            let durable_key = durable.key();
2404            let value = match retractions.remove(&durable_key) {
2405                Some(mut retraction) => {
2406                    retraction.update_from(durable);
2407                    retraction
2408                }
2409                None => durable.into(),
2410            };
2411            let prev = map.insert(mem_key, value);
2412            assert_eq!(
2413                prev, None,
2414                "values must be explicitly retracted before inserting a new value"
2415            );
2416        }
2417    }
2418}
2419
2420fn lookup_builtin_view_addition(
2422    mapping: SystemObjectMapping,
2423) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2424    let (_, builtin) = BUILTIN_LOOKUP
2425        .get(&mapping.description)
2426        .expect("missing builtin view");
2427    let Builtin::View(view) = builtin else {
2428        unreachable!("programming error, expected BuiltinView found {builtin:?}");
2429    };
2430
2431    (
2432        view,
2433        mapping.unique_identifier.catalog_id,
2434        mapping.unique_identifier.global_id,
2435    )
2436}