1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29 SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
35 NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36 TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{assert_none, instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{Instrument, info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::index_sql;
67
68#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81 roles: BTreeMap<RoleKey, Role>,
82 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83 databases: BTreeMap<DatabaseKey, Database>,
84 schemas: BTreeMap<SchemaKey, Schema>,
85 clusters: BTreeMap<ClusterKey, Cluster>,
86 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87 items: BTreeMap<ItemKey, CatalogEntry>,
88 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94 #[must_use]
101 #[instrument]
102 pub(crate) async fn apply_updates_for_bootstrap(
103 &mut self,
104 updates: Vec<StateUpdate>,
105 local_expression_cache: &mut LocalExpressionCache,
106 ) -> (
107 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
108 Vec<ParsedStateUpdate>,
109 ) {
110 let mut builtin_table_updates = Vec::with_capacity(updates.len());
111 let mut catalog_updates = Vec::with_capacity(updates.len());
112 let updates = sort_updates(updates);
113
114 let mut groups: Vec<Vec<_>> = Vec::new();
115 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
116 groups.push(updates.collect());
117 }
118 for updates in groups {
119 let mut apply_state = BootstrapApplyState::Updates(Vec::new());
120 let mut retractions = InProgressRetractions::default();
121
122 for update in updates {
123 let next_apply_state = BootstrapApplyState::new(update);
124 let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
125 .step(
126 next_apply_state,
127 self,
128 &mut retractions,
129 local_expression_cache,
130 )
131 .await;
132 apply_state = next_apply_state;
133 builtin_table_updates.extend(builtin_table_update);
134 catalog_updates.extend(catalog_update);
135 }
136
137 let (builtin_table_update, catalog_update) = apply_state
139 .apply(self, &mut retractions, local_expression_cache)
140 .await;
141 builtin_table_updates.extend(builtin_table_update);
142 catalog_updates.extend(catalog_update);
143 }
144
145 (builtin_table_updates, catalog_updates)
146 }
147
148 #[instrument]
152 pub(crate) fn apply_updates(
153 &mut self,
154 updates: Vec<StateUpdate>,
155 ) -> Result<
156 (
157 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
158 Vec<ParsedStateUpdate>,
159 ),
160 CatalogError,
161 > {
162 let mut builtin_table_updates = Vec::with_capacity(updates.len());
163 let mut catalog_updates = Vec::with_capacity(updates.len());
164
165 let updates = Self::consolidate_updates(updates);
170
171 let updates = sort_updates(updates);
174
175 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
176 let mut retractions = InProgressRetractions::default();
177 let (builtin_table_update, catalog_updates_op) = self.apply_updates_inner(
178 updates.collect(),
179 &mut retractions,
180 &mut LocalExpressionCache::Closed,
181 )?;
182 builtin_table_updates.extend(builtin_table_update);
183 catalog_updates.extend(catalog_updates_op);
184 }
185
186 Ok((builtin_table_updates, catalog_updates))
187 }
188
189 fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
197 let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
198 .into_iter()
199 .map(|update| (update.kind, update.ts, update.diff.into()))
200 .collect_vec();
201
202 consolidate_updates(&mut updates);
203
204 updates
205 .into_iter()
206 .filter(|(_kind, _ts, diff)| *diff != 0.into())
207 .map(|(kind, ts, diff)| StateUpdate {
208 kind,
209 ts,
210 diff: diff
211 .try_into()
212 .expect("catalog state cannot have diff other than -1 or 1"),
213 })
214 .collect_vec()
215 }
216
217 #[instrument(level = "debug")]
218 fn apply_updates_inner(
219 &mut self,
220 updates: Vec<StateUpdate>,
221 retractions: &mut InProgressRetractions,
222 local_expression_cache: &mut LocalExpressionCache,
223 ) -> Result<
224 (
225 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
226 Vec<ParsedStateUpdate>,
227 ),
228 CatalogError,
229 > {
230 soft_assert_no_log!(
231 updates.iter().map(|update| update.ts).all_equal(),
232 "all timestamps should be equal: {updates:?}"
233 );
234
235 let mut update_system_config = false;
236
237 let mut builtin_table_updates = Vec::with_capacity(updates.len());
238 let mut catalog_updates = Vec::new();
239
240 for state_update in updates {
241 if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
242 update_system_config = true;
243 }
244
245 match state_update.diff {
246 StateDiff::Retraction => {
247 if let Some(update) =
251 parsed_state_updates::parse_state_update(self, state_update.clone())
252 {
253 catalog_updates.push(update);
254 }
255
256 builtin_table_updates.extend(self.generate_builtin_table_update(
259 state_update.kind.clone(),
260 state_update.diff,
261 ));
262 self.apply_update(
263 state_update.kind,
264 state_update.diff,
265 retractions,
266 local_expression_cache,
267 )?;
268 }
269 StateDiff::Addition => {
270 self.apply_update(
271 state_update.kind.clone(),
272 state_update.diff,
273 retractions,
274 local_expression_cache,
275 )?;
276 builtin_table_updates.extend(self.generate_builtin_table_update(
280 state_update.kind.clone(),
281 state_update.diff,
282 ));
283
284 if let Some(update) =
287 parsed_state_updates::parse_state_update(self, state_update.clone())
288 {
289 catalog_updates.push(update);
290 }
291 }
292 }
293 }
294
295 if update_system_config {
296 self.system_configuration.dyncfg_updates();
297 }
298
299 Ok((builtin_table_updates, catalog_updates))
300 }
301
302 #[instrument(level = "debug")]
303 fn apply_update(
304 &mut self,
305 kind: StateUpdateKind,
306 diff: StateDiff,
307 retractions: &mut InProgressRetractions,
308 local_expression_cache: &mut LocalExpressionCache,
309 ) -> Result<(), CatalogError> {
310 match kind {
311 StateUpdateKind::Role(role) => {
312 self.apply_role_update(role, diff, retractions);
313 }
314 StateUpdateKind::RoleAuth(role_auth) => {
315 self.apply_role_auth_update(role_auth, diff, retractions);
316 }
317 StateUpdateKind::Database(database) => {
318 self.apply_database_update(database, diff, retractions);
319 }
320 StateUpdateKind::Schema(schema) => {
321 self.apply_schema_update(schema, diff, retractions);
322 }
323 StateUpdateKind::DefaultPrivilege(default_privilege) => {
324 self.apply_default_privilege_update(default_privilege, diff, retractions);
325 }
326 StateUpdateKind::SystemPrivilege(system_privilege) => {
327 self.apply_system_privilege_update(system_privilege, diff, retractions);
328 }
329 StateUpdateKind::SystemConfiguration(system_configuration) => {
330 self.apply_system_configuration_update(system_configuration, diff, retractions);
331 }
332 StateUpdateKind::Cluster(cluster) => {
333 self.apply_cluster_update(cluster, diff, retractions);
334 }
335 StateUpdateKind::NetworkPolicy(network_policy) => {
336 self.apply_network_policy_update(network_policy, diff, retractions);
337 }
338 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
339 self.apply_introspection_source_index_update(
340 introspection_source_index,
341 diff,
342 retractions,
343 );
344 }
345 StateUpdateKind::ClusterReplica(cluster_replica) => {
346 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
347 }
348 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
349 self.apply_system_object_mapping_update(
350 system_object_mapping,
351 diff,
352 retractions,
353 local_expression_cache,
354 );
355 }
356 StateUpdateKind::TemporaryItem(item) => {
357 self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
358 }
359 StateUpdateKind::Item(item) => {
360 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
361 }
362 StateUpdateKind::Comment(comment) => {
363 self.apply_comment_update(comment, diff, retractions);
364 }
365 StateUpdateKind::SourceReferences(source_reference) => {
366 self.apply_source_references_update(source_reference, diff, retractions);
367 }
368 StateUpdateKind::AuditLog(_audit_log) => {
369 }
371 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
372 self.apply_storage_collection_metadata_update(
373 storage_collection_metadata,
374 diff,
375 retractions,
376 );
377 }
378 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
379 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
380 }
381 }
382
383 Ok(())
384 }
385
386 #[instrument(level = "debug")]
387 fn apply_role_auth_update(
388 &mut self,
389 role_auth: mz_catalog::durable::RoleAuth,
390 diff: StateDiff,
391 retractions: &mut InProgressRetractions,
392 ) {
393 apply_with_update(
394 &mut self.role_auth_by_id,
395 role_auth,
396 |role_auth| role_auth.role_id,
397 diff,
398 &mut retractions.role_auths,
399 );
400 }
401
402 #[instrument(level = "debug")]
403 fn apply_role_update(
404 &mut self,
405 role: mz_catalog::durable::Role,
406 diff: StateDiff,
407 retractions: &mut InProgressRetractions,
408 ) {
409 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
410 apply_with_update(
411 &mut self.roles_by_id,
412 role,
413 |role| role.id,
414 diff,
415 &mut retractions.roles,
416 );
417 }
418
419 #[instrument(level = "debug")]
420 fn apply_database_update(
421 &mut self,
422 database: mz_catalog::durable::Database,
423 diff: StateDiff,
424 retractions: &mut InProgressRetractions,
425 ) {
426 apply_inverted_lookup(
427 &mut self.database_by_name,
428 &database.name,
429 database.id,
430 diff,
431 );
432 apply_with_update(
433 &mut self.database_by_id,
434 database,
435 |database| database.id,
436 diff,
437 &mut retractions.databases,
438 );
439 }
440
441 #[instrument(level = "debug")]
442 fn apply_schema_update(
443 &mut self,
444 schema: mz_catalog::durable::Schema,
445 diff: StateDiff,
446 retractions: &mut InProgressRetractions,
447 ) {
448 let (schemas_by_id, schemas_by_name) = match &schema.database_id {
449 Some(database_id) => {
450 let db = self
451 .database_by_id
452 .get_mut(database_id)
453 .expect("catalog out of sync");
454 (&mut db.schemas_by_id, &mut db.schemas_by_name)
455 }
456 None => (
457 &mut self.ambient_schemas_by_id,
458 &mut self.ambient_schemas_by_name,
459 ),
460 };
461 apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
462 apply_with_update(
463 schemas_by_id,
464 schema,
465 |schema| schema.id,
466 diff,
467 &mut retractions.schemas,
468 );
469 }
470
471 #[instrument(level = "debug")]
472 fn apply_default_privilege_update(
473 &mut self,
474 default_privilege: mz_catalog::durable::DefaultPrivilege,
475 diff: StateDiff,
476 _retractions: &mut InProgressRetractions,
477 ) {
478 match diff {
479 StateDiff::Addition => self
480 .default_privileges
481 .grant(default_privilege.object, default_privilege.acl_item),
482 StateDiff::Retraction => self
483 .default_privileges
484 .revoke(&default_privilege.object, &default_privilege.acl_item),
485 }
486 }
487
488 #[instrument(level = "debug")]
489 fn apply_system_privilege_update(
490 &mut self,
491 system_privilege: MzAclItem,
492 diff: StateDiff,
493 _retractions: &mut InProgressRetractions,
494 ) {
495 match diff {
496 StateDiff::Addition => self.system_privileges.grant(system_privilege),
497 StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
498 }
499 }
500
501 #[instrument(level = "debug")]
502 fn apply_system_configuration_update(
503 &mut self,
504 system_configuration: mz_catalog::durable::SystemConfiguration,
505 diff: StateDiff,
506 _retractions: &mut InProgressRetractions,
507 ) {
508 let res = match diff {
509 StateDiff::Addition => self.insert_system_configuration(
510 &system_configuration.name,
511 VarInput::Flat(&system_configuration.value),
512 ),
513 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
514 };
515 match res {
516 Ok(_) => (),
517 Err(Error {
521 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
522 }) => {
523 warn!(%name, "unknown system parameter from catalog storage");
524 }
525 Err(e) => panic!("unable to update system variable: {e:?}"),
526 }
527 }
528
529 #[instrument(level = "debug")]
530 fn apply_cluster_update(
531 &mut self,
532 cluster: mz_catalog::durable::Cluster,
533 diff: StateDiff,
534 retractions: &mut InProgressRetractions,
535 ) {
536 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
537 apply_with_update(
538 &mut self.clusters_by_id,
539 cluster,
540 |cluster| cluster.id,
541 diff,
542 &mut retractions.clusters,
543 );
544 }
545
546 #[instrument(level = "debug")]
547 fn apply_network_policy_update(
548 &mut self,
549 policy: mz_catalog::durable::NetworkPolicy,
550 diff: StateDiff,
551 retractions: &mut InProgressRetractions,
552 ) {
553 apply_inverted_lookup(
554 &mut self.network_policies_by_name,
555 &policy.name,
556 policy.id,
557 diff,
558 );
559 apply_with_update(
560 &mut self.network_policies_by_id,
561 policy,
562 |policy| policy.id,
563 diff,
564 &mut retractions.network_policies,
565 );
566 }
567
568 #[instrument(level = "debug")]
569 fn apply_introspection_source_index_update(
570 &mut self,
571 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
572 diff: StateDiff,
573 retractions: &mut InProgressRetractions,
574 ) {
575 let cluster = self
576 .clusters_by_id
577 .get_mut(&introspection_source_index.cluster_id)
578 .expect("catalog out of sync");
579 let log = BUILTIN_LOG_LOOKUP
580 .get(introspection_source_index.name.as_str())
581 .expect("missing log");
582 apply_inverted_lookup(
583 &mut cluster.log_indexes,
584 &log.variant,
585 introspection_source_index.index_id,
586 diff,
587 );
588
589 match diff {
590 StateDiff::Addition => {
591 if let Some(mut entry) = retractions
592 .introspection_source_indexes
593 .remove(&introspection_source_index.item_id)
594 {
595 let (index_name, index) = self.create_introspection_source_index(
598 introspection_source_index.cluster_id,
599 log,
600 introspection_source_index.index_id,
601 );
602 assert_eq!(entry.id, introspection_source_index.item_id);
603 assert_eq!(entry.oid, introspection_source_index.oid);
604 assert_eq!(entry.name, index_name);
605 entry.item = index;
606 self.insert_entry(entry);
607 } else {
608 self.insert_introspection_source_index(
609 introspection_source_index.cluster_id,
610 log,
611 introspection_source_index.item_id,
612 introspection_source_index.index_id,
613 introspection_source_index.oid,
614 );
615 }
616 }
617 StateDiff::Retraction => {
618 let entry = self.drop_item(introspection_source_index.item_id);
619 retractions
620 .introspection_source_indexes
621 .insert(entry.id, entry);
622 }
623 }
624 }
625
626 #[instrument(level = "debug")]
627 fn apply_cluster_replica_update(
628 &mut self,
629 cluster_replica: mz_catalog::durable::ClusterReplica,
630 diff: StateDiff,
631 _retractions: &mut InProgressRetractions,
632 ) {
633 let cluster = self
634 .clusters_by_id
635 .get(&cluster_replica.cluster_id)
636 .expect("catalog out of sync");
637 let azs = cluster.availability_zones();
638 let location = self
639 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
640 .expect("catalog in unexpected state");
641 let cluster = self
642 .clusters_by_id
643 .get_mut(&cluster_replica.cluster_id)
644 .expect("catalog out of sync");
645 apply_inverted_lookup(
646 &mut cluster.replica_id_by_name_,
647 &cluster_replica.name,
648 cluster_replica.replica_id,
649 diff,
650 );
651 match diff {
652 StateDiff::Retraction => {
653 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
654 assert!(
655 prev.is_some(),
656 "retraction does not match existing value: {:?}",
657 cluster_replica.replica_id
658 );
659 }
660 StateDiff::Addition => {
661 let logging = ReplicaLogging {
662 log_logging: cluster_replica.config.logging.log_logging,
663 interval: cluster_replica.config.logging.interval,
664 };
665 let config = ReplicaConfig {
666 location,
667 compute: ComputeReplicaConfig { logging },
668 };
669 let mem_cluster_replica = ClusterReplica {
670 name: cluster_replica.name.clone(),
671 cluster_id: cluster_replica.cluster_id,
672 replica_id: cluster_replica.replica_id,
673 config,
674 owner_id: cluster_replica.owner_id,
675 };
676 let prev = cluster
677 .replicas_by_id_
678 .insert(cluster_replica.replica_id, mem_cluster_replica);
679 assert_eq!(
680 prev, None,
681 "values must be explicitly retracted before inserting a new value: {:?}",
682 cluster_replica.replica_id
683 );
684 }
685 }
686 }
687
688 #[instrument(level = "debug")]
689 fn apply_system_object_mapping_update(
690 &mut self,
691 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
692 diff: StateDiff,
693 retractions: &mut InProgressRetractions,
694 local_expression_cache: &mut LocalExpressionCache,
695 ) {
696 let item_id = system_object_mapping.unique_identifier.catalog_id;
697 let global_id = system_object_mapping.unique_identifier.global_id;
698
699 if system_object_mapping.unique_identifier.runtime_alterable() {
700 return;
704 }
705
706 if let StateDiff::Retraction = diff {
707 let entry = self.drop_item(item_id);
708 retractions.system_object_mappings.insert(item_id, entry);
709 return;
710 }
711
712 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
713 self.insert_entry(entry);
718 return;
719 }
720
721 let builtin = BUILTIN_LOOKUP
722 .get(&system_object_mapping.description)
723 .expect("missing builtin")
724 .1;
725 let schema_name = builtin.schema();
726 let schema_id = self
727 .ambient_schemas_by_name
728 .get(schema_name)
729 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
730 let name = QualifiedItemName {
731 qualifiers: ItemQualifiers {
732 database_spec: ResolvedDatabaseSpecifier::Ambient,
733 schema_spec: SchemaSpecifier::Id(*schema_id),
734 },
735 item: builtin.name().into(),
736 };
737 match builtin {
738 Builtin::Log(log) => {
739 let mut acl_items = vec![rbac::owner_privilege(
740 mz_sql::catalog::ObjectType::Source,
741 MZ_SYSTEM_ROLE_ID,
742 )];
743 acl_items.extend_from_slice(&log.access);
744 self.insert_item(
745 item_id,
746 log.oid,
747 name.clone(),
748 CatalogItem::Log(Log {
749 variant: log.variant,
750 global_id,
751 }),
752 MZ_SYSTEM_ROLE_ID,
753 PrivilegeMap::from_mz_acl_items(acl_items),
754 );
755 }
756
757 Builtin::Table(table) => {
758 let mut acl_items = vec![rbac::owner_privilege(
759 mz_sql::catalog::ObjectType::Table,
760 MZ_SYSTEM_ROLE_ID,
761 )];
762 acl_items.extend_from_slice(&table.access);
763
764 self.insert_item(
765 item_id,
766 table.oid,
767 name.clone(),
768 CatalogItem::Table(Table {
769 create_sql: None,
770 desc: VersionedRelationDesc::new(table.desc.clone()),
771 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
772 conn_id: None,
773 resolved_ids: ResolvedIds::empty(),
774 custom_logical_compaction_window: table.is_retained_metrics_object.then(
775 || {
776 self.system_config()
777 .metrics_retention()
778 .try_into()
779 .expect("invalid metrics retention")
780 },
781 ),
782 is_retained_metrics_object: table.is_retained_metrics_object,
783 data_source: TableDataSource::TableWrites {
784 defaults: vec![Expr::null(); table.desc.arity()],
785 },
786 }),
787 MZ_SYSTEM_ROLE_ID,
788 PrivilegeMap::from_mz_acl_items(acl_items),
789 );
790 }
791 Builtin::Index(index) => {
792 let custom_logical_compaction_window =
793 index.is_retained_metrics_object.then(|| {
794 self.system_config()
795 .metrics_retention()
796 .try_into()
797 .expect("invalid metrics retention")
798 });
799 let versions = BTreeMap::new();
801
802 let item = self
803 .parse_item(
804 global_id,
805 &index.create_sql(),
806 &versions,
807 None,
808 index.is_retained_metrics_object,
809 custom_logical_compaction_window,
810 local_expression_cache,
811 None,
812 )
813 .unwrap_or_else(|e| {
814 panic!(
815 "internal error: failed to load bootstrap index:\n\
816 {}\n\
817 error:\n\
818 {:?}\n\n\
819 make sure that the schema name is specified in the builtin index's create sql statement.",
820 index.name, e
821 )
822 });
823 let CatalogItem::Index(_) = item else {
824 panic!(
825 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
826 index.name
827 );
828 };
829
830 self.insert_item(
831 item_id,
832 index.oid,
833 name,
834 item,
835 MZ_SYSTEM_ROLE_ID,
836 PrivilegeMap::default(),
837 );
838 }
839 Builtin::View(_) => {
840 unreachable!("views added elsewhere");
842 }
843
844 Builtin::Type(typ) => {
846 let typ = self.resolve_builtin_type_references(typ);
847 if let CatalogType::Array { element_reference } = typ.details.typ {
848 let entry = self.get_entry_mut(&element_reference);
849 let item_type = match &mut entry.item {
850 CatalogItem::Type(item_type) => item_type,
851 _ => unreachable!("types can only reference other types"),
852 };
853 item_type.details.array_id = Some(item_id);
854 }
855
856 let schema_id = self.resolve_system_schema(typ.schema);
857
858 self.insert_item(
859 item_id,
860 typ.oid,
861 QualifiedItemName {
862 qualifiers: ItemQualifiers {
863 database_spec: ResolvedDatabaseSpecifier::Ambient,
864 schema_spec: SchemaSpecifier::Id(schema_id),
865 },
866 item: typ.name.to_owned(),
867 },
868 CatalogItem::Type(Type {
869 create_sql: None,
870 global_id,
871 details: typ.details.clone(),
872 resolved_ids: ResolvedIds::empty(),
873 }),
874 MZ_SYSTEM_ROLE_ID,
875 PrivilegeMap::from_mz_acl_items(vec![
876 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
877 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
878 ]),
879 );
880 }
881
882 Builtin::Func(func) => {
883 let oid = INVALID_OID;
887 self.insert_item(
888 item_id,
889 oid,
890 name.clone(),
891 CatalogItem::Func(Func {
892 inner: func.inner,
893 global_id,
894 }),
895 MZ_SYSTEM_ROLE_ID,
896 PrivilegeMap::default(),
897 );
898 }
899
900 Builtin::Source(coll) => {
901 let mut acl_items = vec![rbac::owner_privilege(
902 mz_sql::catalog::ObjectType::Source,
903 MZ_SYSTEM_ROLE_ID,
904 )];
905 acl_items.extend_from_slice(&coll.access);
906
907 self.insert_item(
908 item_id,
909 coll.oid,
910 name.clone(),
911 CatalogItem::Source(Source {
912 create_sql: None,
913 data_source: DataSourceDesc::Introspection(coll.data_source),
914 desc: coll.desc.clone(),
915 global_id,
916 timeline: Timeline::EpochMilliseconds,
917 resolved_ids: ResolvedIds::empty(),
918 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
919 || {
920 self.system_config()
921 .metrics_retention()
922 .try_into()
923 .expect("invalid metrics retention")
924 },
925 ),
926 is_retained_metrics_object: coll.is_retained_metrics_object,
927 }),
928 MZ_SYSTEM_ROLE_ID,
929 PrivilegeMap::from_mz_acl_items(acl_items),
930 );
931 }
932 Builtin::ContinualTask(ct) => {
933 let mut acl_items = vec![rbac::owner_privilege(
934 mz_sql::catalog::ObjectType::Source,
935 MZ_SYSTEM_ROLE_ID,
936 )];
937 acl_items.extend_from_slice(&ct.access);
938 let versions = BTreeMap::new();
940
941 let item = self
942 .parse_item(
943 global_id,
944 &ct.create_sql(),
945 &versions,
946 None,
947 false,
948 None,
949 local_expression_cache,
950 None,
951 )
952 .unwrap_or_else(|e| {
953 panic!(
954 "internal error: failed to load bootstrap continual task:\n\
955 {}\n\
956 error:\n\
957 {:?}\n\n\
958 make sure that the schema name is specified in the builtin continual task's create sql statement.",
959 ct.name, e
960 )
961 });
962 let CatalogItem::ContinualTask(_) = &item else {
963 panic!(
964 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
965 ct.name
966 );
967 };
968
969 self.insert_item(
970 item_id,
971 ct.oid,
972 name,
973 item,
974 MZ_SYSTEM_ROLE_ID,
975 PrivilegeMap::from_mz_acl_items(acl_items),
976 );
977 }
978 Builtin::Connection(connection) => {
979 let versions = BTreeMap::new();
981 let mut item = self
982 .parse_item(
983 global_id,
984 connection.sql,
985 &versions,
986 None,
987 false,
988 None,
989 local_expression_cache,
990 None,
991 )
992 .unwrap_or_else(|e| {
993 panic!(
994 "internal error: failed to load bootstrap connection:\n\
995 {}\n\
996 error:\n\
997 {:?}\n\n\
998 make sure that the schema name is specified in the builtin connection's create sql statement.",
999 connection.name, e
1000 )
1001 });
1002 let CatalogItem::Connection(_) = &mut item else {
1003 panic!(
1004 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1005 connection.name
1006 );
1007 };
1008
1009 let mut acl_items = vec![rbac::owner_privilege(
1010 mz_sql::catalog::ObjectType::Connection,
1011 connection.owner_id.clone(),
1012 )];
1013 acl_items.extend_from_slice(connection.access);
1014
1015 self.insert_item(
1016 item_id,
1017 connection.oid,
1018 name.clone(),
1019 item,
1020 connection.owner_id.clone(),
1021 PrivilegeMap::from_mz_acl_items(acl_items),
1022 );
1023 }
1024 }
1025 }
1026
1027 #[instrument(level = "debug")]
1028 fn apply_temporary_item_update(
1029 &mut self,
1030 temporary_item: TemporaryItem,
1031 diff: StateDiff,
1032 retractions: &mut InProgressRetractions,
1033 local_expression_cache: &mut LocalExpressionCache,
1034 ) {
1035 match diff {
1036 StateDiff::Addition => {
1037 let TemporaryItem {
1038 id,
1039 oid,
1040 global_id,
1041 schema_id,
1042 name,
1043 conn_id,
1044 create_sql,
1045 owner_id,
1046 privileges,
1047 extra_versions,
1048 } = temporary_item;
1049 let schema = self.find_temp_schema(&schema_id);
1050 let name = QualifiedItemName {
1051 qualifiers: ItemQualifiers {
1052 database_spec: schema.database().clone(),
1053 schema_spec: schema.id().clone(),
1054 },
1055 item: name.clone(),
1056 };
1057
1058 let entry = match retractions.temp_items.remove(&id) {
1059 Some(mut retraction) => {
1060 assert_eq!(retraction.id, id);
1061
1062 if retraction.create_sql() != create_sql {
1067 let mut catalog_item = self
1068 .deserialize_item(
1069 global_id,
1070 &create_sql,
1071 &extra_versions,
1072 local_expression_cache,
1073 Some(retraction.item),
1074 )
1075 .unwrap_or_else(|e| {
1076 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1077 });
1078 catalog_item.set_conn_id(conn_id);
1085 retraction.item = catalog_item;
1086 }
1087
1088 retraction.id = id;
1089 retraction.oid = oid;
1090 retraction.name = name;
1091 retraction.owner_id = owner_id;
1092 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1093 retraction
1094 }
1095 None => {
1096 let mut catalog_item = self
1097 .deserialize_item(
1098 global_id,
1099 &create_sql,
1100 &extra_versions,
1101 local_expression_cache,
1102 None,
1103 )
1104 .unwrap_or_else(|e| {
1105 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1106 });
1107
1108 catalog_item.set_conn_id(conn_id);
1114
1115 CatalogEntry {
1116 item: catalog_item,
1117 referenced_by: Vec::new(),
1118 used_by: Vec::new(),
1119 id,
1120 oid,
1121 name,
1122 owner_id,
1123 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1124 }
1125 }
1126 };
1127 self.insert_entry(entry);
1128 }
1129 StateDiff::Retraction => {
1130 let entry = self.drop_item(temporary_item.id);
1131 retractions.temp_items.insert(temporary_item.id, entry);
1132 }
1133 }
1134 }
1135
1136 #[instrument(level = "debug")]
1137 fn apply_item_update(
1138 &mut self,
1139 item: mz_catalog::durable::Item,
1140 diff: StateDiff,
1141 retractions: &mut InProgressRetractions,
1142 local_expression_cache: &mut LocalExpressionCache,
1143 ) -> Result<(), CatalogError> {
1144 match diff {
1145 StateDiff::Addition => {
1146 let key = item.key();
1147 let mz_catalog::durable::Item {
1148 id,
1149 oid,
1150 global_id,
1151 schema_id,
1152 name,
1153 create_sql,
1154 owner_id,
1155 privileges,
1156 extra_versions,
1157 } = item;
1158 let schema = self.find_non_temp_schema(&schema_id);
1159 let name = QualifiedItemName {
1160 qualifiers: ItemQualifiers {
1161 database_spec: schema.database().clone(),
1162 schema_spec: schema.id().clone(),
1163 },
1164 item: name.clone(),
1165 };
1166 let entry = match retractions.items.remove(&key) {
1167 Some(retraction) => {
1168 assert_eq!(retraction.id, item.id);
1169
1170 let item = self
1171 .deserialize_item(
1172 global_id,
1173 &create_sql,
1174 &extra_versions,
1175 local_expression_cache,
1176 Some(retraction.item),
1177 )
1178 .unwrap_or_else(|e| {
1179 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1180 });
1181
1182 CatalogEntry {
1183 item,
1184 id,
1185 oid,
1186 name,
1187 owner_id,
1188 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1189 referenced_by: retraction.referenced_by,
1190 used_by: retraction.used_by,
1191 }
1192 }
1193 None => {
1194 let catalog_item = self
1195 .deserialize_item(
1196 global_id,
1197 &create_sql,
1198 &extra_versions,
1199 local_expression_cache,
1200 None,
1201 )
1202 .unwrap_or_else(|e| {
1203 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1204 });
1205 CatalogEntry {
1206 item: catalog_item,
1207 referenced_by: Vec::new(),
1208 used_by: Vec::new(),
1209 id,
1210 oid,
1211 name,
1212 owner_id,
1213 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1214 }
1215 }
1216 };
1217
1218 self.insert_entry(entry);
1219 }
1220 StateDiff::Retraction => {
1221 let entry = self.drop_item(item.id);
1222 let key = item.into_key_value().0;
1223 retractions.items.insert(key, entry);
1224 }
1225 }
1226 Ok(())
1227 }
1228
1229 #[instrument(level = "debug")]
1230 fn apply_comment_update(
1231 &mut self,
1232 comment: mz_catalog::durable::Comment,
1233 diff: StateDiff,
1234 _retractions: &mut InProgressRetractions,
1235 ) {
1236 match diff {
1237 StateDiff::Addition => {
1238 let prev = self.comments.update_comment(
1239 comment.object_id,
1240 comment.sub_component,
1241 Some(comment.comment),
1242 );
1243 assert_eq!(
1244 prev, None,
1245 "values must be explicitly retracted before inserting a new value"
1246 );
1247 }
1248 StateDiff::Retraction => {
1249 let prev =
1250 self.comments
1251 .update_comment(comment.object_id, comment.sub_component, None);
1252 assert_eq!(
1253 prev,
1254 Some(comment.comment),
1255 "retraction does not match existing value: ({:?}, {:?})",
1256 comment.object_id,
1257 comment.sub_component,
1258 );
1259 }
1260 }
1261 }
1262
1263 #[instrument(level = "debug")]
1264 fn apply_source_references_update(
1265 &mut self,
1266 source_references: mz_catalog::durable::SourceReferences,
1267 diff: StateDiff,
1268 _retractions: &mut InProgressRetractions,
1269 ) {
1270 match diff {
1271 StateDiff::Addition => {
1272 let prev = self
1273 .source_references
1274 .insert(source_references.source_id, source_references.into());
1275 assert!(
1276 prev.is_none(),
1277 "values must be explicitly retracted before inserting a new value: {prev:?}"
1278 );
1279 }
1280 StateDiff::Retraction => {
1281 let prev = self.source_references.remove(&source_references.source_id);
1282 assert!(
1283 prev.is_some(),
1284 "retraction for a non-existent existing value: {source_references:?}"
1285 );
1286 }
1287 }
1288 }
1289
1290 #[instrument(level = "debug")]
1291 fn apply_storage_collection_metadata_update(
1292 &mut self,
1293 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1294 diff: StateDiff,
1295 _retractions: &mut InProgressRetractions,
1296 ) {
1297 apply_inverted_lookup(
1298 &mut self.storage_metadata.collection_metadata,
1299 &storage_collection_metadata.id,
1300 storage_collection_metadata.shard,
1301 diff,
1302 );
1303 }
1304
1305 #[instrument(level = "debug")]
1306 fn apply_unfinalized_shard_update(
1307 &mut self,
1308 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1309 diff: StateDiff,
1310 _retractions: &mut InProgressRetractions,
1311 ) {
1312 match diff {
1313 StateDiff::Addition => {
1314 let newly_inserted = self
1315 .storage_metadata
1316 .unfinalized_shards
1317 .insert(unfinalized_shard.shard);
1318 assert!(
1319 newly_inserted,
1320 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1321 );
1322 }
1323 StateDiff::Retraction => {
1324 let removed = self
1325 .storage_metadata
1326 .unfinalized_shards
1327 .remove(&unfinalized_shard.shard);
1328 assert!(
1329 removed,
1330 "retraction does not match existing value: {unfinalized_shard:?}"
1331 );
1332 }
1333 }
1334 }
1335
1336 #[instrument]
1339 pub(crate) fn generate_builtin_table_updates(
1340 &self,
1341 updates: Vec<StateUpdate>,
1342 ) -> Vec<BuiltinTableUpdate> {
1343 let mut builtin_table_updates = Vec::new();
1344 for StateUpdate { kind, ts: _, diff } in updates {
1345 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1346 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1347 builtin_table_updates.extend(builtin_table_update);
1348 }
1349 builtin_table_updates
1350 }
1351
1352 #[instrument(level = "debug")]
1355 pub(crate) fn generate_builtin_table_update(
1356 &self,
1357 kind: StateUpdateKind,
1358 diff: StateDiff,
1359 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1360 let diff = diff.into();
1361 match kind {
1362 StateUpdateKind::Role(role) => {
1363 let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1364 for group_id in role.membership.map.keys() {
1365 builtin_table_updates
1366 .push(self.pack_role_members_update(*group_id, role.id, diff))
1367 }
1368 builtin_table_updates
1369 }
1370 StateUpdateKind::RoleAuth(role_auth) => {
1371 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1372 }
1373 StateUpdateKind::Database(database) => {
1374 vec![self.pack_database_update(&database.id, diff)]
1375 }
1376 StateUpdateKind::Schema(schema) => {
1377 let db_spec = schema.database_id.into();
1378 vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1379 }
1380 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1381 vec![self.pack_default_privileges_update(
1382 &default_privilege.object,
1383 &default_privilege.acl_item.grantee,
1384 &default_privilege.acl_item.acl_mode,
1385 diff,
1386 )]
1387 }
1388 StateUpdateKind::SystemPrivilege(system_privilege) => {
1389 vec![self.pack_system_privileges_update(system_privilege, diff)]
1390 }
1391 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1392 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1393 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1394 self.pack_item_update(introspection_source_index.item_id, diff)
1395 }
1396 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1397 cluster_replica.cluster_id,
1398 &cluster_replica.name,
1399 diff,
1400 ),
1401 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1402 if !system_object_mapping.unique_identifier.runtime_alterable() {
1406 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1407 } else {
1408 vec![]
1409 }
1410 }
1411 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1412 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1413 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1414 comment.object_id,
1415 comment.sub_component,
1416 &comment.comment,
1417 diff,
1418 )],
1419 StateUpdateKind::SourceReferences(source_references) => {
1420 self.pack_source_references_update(&source_references, diff)
1421 }
1422 StateUpdateKind::AuditLog(audit_log) => {
1423 vec![
1424 self.pack_audit_log_update(&audit_log.event, diff)
1425 .expect("could not pack audit log update"),
1426 ]
1427 }
1428 StateUpdateKind::NetworkPolicy(policy) => self
1429 .pack_network_policy_update(&policy.id, diff)
1430 .expect("could not pack audit log update"),
1431 StateUpdateKind::StorageCollectionMetadata(_)
1432 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1433 }
1434 }
1435
1436 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1437 self.entry_by_id
1438 .get_mut(id)
1439 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1440 }
1441
1442 fn get_schema_mut(
1443 &mut self,
1444 database_spec: &ResolvedDatabaseSpecifier,
1445 schema_spec: &SchemaSpecifier,
1446 conn_id: &ConnectionId,
1447 ) -> &mut Schema {
1448 match (database_spec, schema_spec) {
1450 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1451 .temporary_schemas
1452 .get_mut(conn_id)
1453 .expect("catalog out of sync"),
1454 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1455 .ambient_schemas_by_id
1456 .get_mut(id)
1457 .expect("catalog out of sync"),
1458 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1459 .database_by_id
1460 .get_mut(database_id)
1461 .expect("catalog out of sync")
1462 .schemas_by_id
1463 .get_mut(schema_id)
1464 .expect("catalog out of sync"),
1465 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1466 unreachable!("temporary schemas are in the ambient database")
1467 }
1468 }
1469 }
1470
1471 #[instrument(name = "catalog::parse_views")]
1481 async fn parse_builtin_views(
1482 state: &mut CatalogState,
1483 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1484 retractions: &mut InProgressRetractions,
1485 local_expression_cache: &mut LocalExpressionCache,
1486 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1487 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1488 let (updates, additions): (Vec<_>, Vec<_>) =
1489 builtin_views
1490 .into_iter()
1491 .partition_map(|(view, item_id, gid)| {
1492 match retractions.system_object_mappings.remove(&item_id) {
1493 Some(entry) => Either::Left(entry),
1494 None => Either::Right((view, item_id, gid)),
1495 }
1496 });
1497
1498 for entry in updates {
1499 let item_id = entry.id();
1504 state.insert_entry(entry);
1505 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1506 }
1507
1508 let mut handles = Vec::new();
1509 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1510 BTreeMap::new();
1511 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1512 let mut awaiting_all = Vec::new();
1515 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1517 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1518
1519 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1521 .into_iter()
1522 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1523 .collect();
1524 let item_ids: Vec<_> = views.keys().copied().collect();
1525
1526 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1527 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1528 if handles.is_empty() && ready.is_empty() {
1529 ready.extend(awaiting_all.drain(..));
1531 }
1532
1533 if !ready.is_empty() {
1535 let spawn_state = Arc::new(state.clone());
1536 while let Some(id) = ready.pop_front() {
1537 let (view, global_id) = views.get(&id).expect("must exist");
1538 let global_id = *global_id;
1539 let create_sql = view.create_sql();
1540 let versions = BTreeMap::new();
1542
1543 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1544 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1545 let task_state = Arc::clone(&spawn_state);
1546 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1547 let handle = mz_ore::task::spawn(
1548 || "parse view",
1549 async move {
1550 let res = task_state.parse_item_inner(
1551 global_id,
1552 &create_sql,
1553 &versions,
1554 None,
1555 false,
1556 None,
1557 cached_expr,
1558 None,
1559 );
1560 (id, global_id, res)
1561 }
1562 .instrument(span),
1563 );
1564 handles.push(handle);
1565 }
1566 }
1567
1568 let (selected, _idx, remaining) = future::select_all(handles).await;
1570 handles = remaining;
1571 let (id, global_id, res) = selected;
1572 let mut insert_cached_expr = |cached_expr| {
1573 if let Some(cached_expr) = cached_expr {
1574 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1575 }
1576 };
1577 match res {
1578 Ok((item, uncached_expr)) => {
1579 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1580 local_expression_cache.insert_uncached_expression(
1581 global_id,
1582 uncached_expr,
1583 optimizer_features,
1584 );
1585 }
1586 let (view, _gid) = views.remove(&id).expect("must exist");
1588 let schema_id = state
1589 .ambient_schemas_by_name
1590 .get(view.schema)
1591 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1592 let qname = QualifiedItemName {
1593 qualifiers: ItemQualifiers {
1594 database_spec: ResolvedDatabaseSpecifier::Ambient,
1595 schema_spec: SchemaSpecifier::Id(*schema_id),
1596 },
1597 item: view.name.into(),
1598 };
1599 let mut acl_items = vec![rbac::owner_privilege(
1600 mz_sql::catalog::ObjectType::View,
1601 MZ_SYSTEM_ROLE_ID,
1602 )];
1603 acl_items.extend_from_slice(&view.access);
1604
1605 state.insert_item(
1606 id,
1607 view.oid,
1608 qname,
1609 item,
1610 MZ_SYSTEM_ROLE_ID,
1611 PrivilegeMap::from_mz_acl_items(acl_items),
1612 );
1613
1614 let mut resolved_dependent_items = Vec::new();
1616 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1617 resolved_dependent_items.extend(dependent_items);
1618 }
1619 let entry = state.get_entry(&id);
1620 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1621 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1622 resolved_dependent_items.extend(dependent_items);
1623 }
1624 ready.extend(resolved_dependent_items);
1625
1626 completed_ids.insert(id);
1627 completed_names.insert(full_name);
1628 }
1629 Err((
1631 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1632 cached_expr,
1633 )) => {
1634 insert_cached_expr(cached_expr);
1635 if completed_ids.contains(&missing_dep) {
1636 ready.push_back(id);
1637 } else {
1638 awaiting_id_dependencies
1639 .entry(missing_dep)
1640 .or_default()
1641 .push(id);
1642 }
1643 }
1644 Err((
1646 AdapterError::PlanError(plan::PlanError::Catalog(
1647 SqlCatalogError::UnknownItem(missing_dep),
1648 )),
1649 cached_expr,
1650 )) => {
1651 insert_cached_expr(cached_expr);
1652 match CatalogItemId::from_str(&missing_dep) {
1653 Ok(missing_dep) => {
1654 if completed_ids.contains(&missing_dep) {
1655 ready.push_back(id);
1656 } else {
1657 awaiting_id_dependencies
1658 .entry(missing_dep)
1659 .or_default()
1660 .push(id);
1661 }
1662 }
1663 Err(_) => {
1664 if completed_names.contains(&missing_dep) {
1665 ready.push_back(id);
1666 } else {
1667 awaiting_name_dependencies
1668 .entry(missing_dep)
1669 .or_default()
1670 .push(id);
1671 }
1672 }
1673 }
1674 }
1675 Err((
1676 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1677 cached_expr,
1678 )) => {
1679 insert_cached_expr(cached_expr);
1680 awaiting_all.push(id);
1681 }
1682 Err((e, _)) => {
1683 let (bad_view, _gid) = views.get(&id).expect("must exist");
1684 panic!(
1685 "internal error: failed to load bootstrap view:\n\
1686 {name}\n\
1687 error:\n\
1688 {e:?}\n\n\
1689 Make sure that the schema name is specified in the builtin view's create sql statement.
1690 ",
1691 name = bad_view.name,
1692 )
1693 }
1694 }
1695 }
1696
1697 assert!(awaiting_id_dependencies.is_empty());
1698 assert!(
1699 awaiting_name_dependencies.is_empty(),
1700 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1701 );
1702 assert!(awaiting_all.is_empty());
1703 assert!(views.is_empty());
1704
1705 builtin_table_updates.extend(
1707 item_ids
1708 .into_iter()
1709 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1710 );
1711
1712 builtin_table_updates
1713 }
1714
1715 fn insert_entry(&mut self, entry: CatalogEntry) {
1717 if !entry.id.is_system() {
1718 if let Some(cluster_id) = entry.item.cluster_id() {
1719 self.clusters_by_id
1720 .get_mut(&cluster_id)
1721 .expect("catalog out of sync")
1722 .bound_objects
1723 .insert(entry.id);
1724 };
1725 }
1726
1727 for u in entry.references().items() {
1728 match self.entry_by_id.get_mut(u) {
1729 Some(metadata) => metadata.referenced_by.push(entry.id()),
1730 None => panic!(
1731 "Catalog: missing dependent catalog item {} while installing {}",
1732 &u,
1733 self.resolve_full_name(entry.name(), entry.conn_id())
1734 ),
1735 }
1736 }
1737 for u in entry.uses() {
1738 if u == entry.id() {
1741 continue;
1742 }
1743 match self.entry_by_id.get_mut(&u) {
1744 Some(metadata) => metadata.used_by.push(entry.id()),
1745 None => panic!(
1746 "Catalog: missing dependent catalog item {} while installing {}",
1747 &u,
1748 self.resolve_full_name(entry.name(), entry.conn_id())
1749 ),
1750 }
1751 }
1752 for gid in entry.item.global_ids() {
1753 self.entry_by_global_id.insert(gid, entry.id());
1754 }
1755 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1756 let schema = self.get_schema_mut(
1757 &entry.name().qualifiers.database_spec,
1758 &entry.name().qualifiers.schema_spec,
1759 conn_id,
1760 );
1761
1762 let prev_id = match entry.item() {
1763 CatalogItem::Func(_) => schema
1764 .functions
1765 .insert(entry.name().item.clone(), entry.id()),
1766 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1767 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1768 };
1769
1770 assert!(
1771 prev_id.is_none(),
1772 "builtin name collision on {:?}",
1773 entry.name().item.clone()
1774 );
1775
1776 self.entry_by_id.insert(entry.id(), entry.clone());
1777 }
1778
1779 fn insert_item(
1781 &mut self,
1782 id: CatalogItemId,
1783 oid: u32,
1784 name: QualifiedItemName,
1785 item: CatalogItem,
1786 owner_id: RoleId,
1787 privileges: PrivilegeMap,
1788 ) {
1789 let entry = CatalogEntry {
1790 item,
1791 name,
1792 id,
1793 oid,
1794 used_by: Vec::new(),
1795 referenced_by: Vec::new(),
1796 owner_id,
1797 privileges,
1798 };
1799
1800 self.insert_entry(entry);
1801 }
1802
1803 #[mz_ore::instrument(level = "trace")]
1804 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1805 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1806 for u in metadata.references().items() {
1807 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1808 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1809 }
1810 }
1811 for u in metadata.uses() {
1812 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1813 dep_metadata.used_by.retain(|u| *u != metadata.id())
1814 }
1815 }
1816 for gid in metadata.global_ids() {
1817 self.entry_by_global_id.remove(&gid);
1818 }
1819
1820 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1821 let schema = self.get_schema_mut(
1822 &metadata.name().qualifiers.database_spec,
1823 &metadata.name().qualifiers.schema_spec,
1824 conn_id,
1825 );
1826 if metadata.item_type() == CatalogItemType::Type {
1827 schema
1828 .types
1829 .remove(&metadata.name().item)
1830 .expect("catalog out of sync");
1831 } else {
1832 assert_ne!(metadata.item_type(), CatalogItemType::Func);
1835
1836 schema
1837 .items
1838 .remove(&metadata.name().item)
1839 .expect("catalog out of sync");
1840 };
1841
1842 if !id.is_system() {
1843 if let Some(cluster_id) = metadata.item().cluster_id() {
1844 assert!(
1845 self.clusters_by_id
1846 .get_mut(&cluster_id)
1847 .expect("catalog out of sync")
1848 .bound_objects
1849 .remove(&id),
1850 "catalog out of sync"
1851 );
1852 }
1853 }
1854
1855 metadata
1856 }
1857
1858 fn insert_introspection_source_index(
1859 &mut self,
1860 cluster_id: ClusterId,
1861 log: &'static BuiltinLog,
1862 item_id: CatalogItemId,
1863 global_id: GlobalId,
1864 oid: u32,
1865 ) {
1866 let (index_name, index) =
1867 self.create_introspection_source_index(cluster_id, log, global_id);
1868 self.insert_item(
1869 item_id,
1870 oid,
1871 index_name,
1872 index,
1873 MZ_SYSTEM_ROLE_ID,
1874 PrivilegeMap::default(),
1875 );
1876 }
1877
1878 fn create_introspection_source_index(
1879 &self,
1880 cluster_id: ClusterId,
1881 log: &'static BuiltinLog,
1882 global_id: GlobalId,
1883 ) -> (QualifiedItemName, CatalogItem) {
1884 let source_name = FullItemName {
1885 database: RawDatabaseSpecifier::Ambient,
1886 schema: log.schema.into(),
1887 item: log.name.into(),
1888 };
1889 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1890 let mut index_name = QualifiedItemName {
1891 qualifiers: ItemQualifiers {
1892 database_spec: ResolvedDatabaseSpecifier::Ambient,
1893 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1894 },
1895 item: index_name.clone(),
1896 };
1897 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1898 let index_item_name = index_name.item.clone();
1899 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1900 let index = CatalogItem::Index(Index {
1901 global_id,
1902 on: log_global_id,
1903 keys: log
1904 .variant
1905 .index_by()
1906 .into_iter()
1907 .map(MirScalarExpr::column)
1908 .collect(),
1909 create_sql: index_sql(
1910 index_item_name,
1911 cluster_id,
1912 source_name,
1913 &log.variant.desc(),
1914 &log.variant.index_by(),
1915 ),
1916 conn_id: None,
1917 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1918 cluster_id,
1919 is_retained_metrics_object: false,
1920 custom_logical_compaction_window: None,
1921 });
1922 (index_name, index)
1923 }
1924
1925 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1930 Ok(self.system_configuration.set(name, value)?)
1931 }
1932
1933 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1938 Ok(self.system_configuration.reset(name)?)
1939 }
1940}
1941
1942fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1944 let mut sorted_updates = Vec::with_capacity(updates.len());
1945
1946 updates.sort_by_key(|update| update.ts);
1947 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
1948 let sorted_ts_updates = sort_updates_inner(updates.collect());
1949 sorted_updates.extend(sorted_ts_updates);
1950 }
1951
1952 sorted_updates
1953}
1954
1955fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1957 fn push_update<T>(
1958 update: T,
1959 diff: StateDiff,
1960 retractions: &mut Vec<T>,
1961 additions: &mut Vec<T>,
1962 ) {
1963 match diff {
1964 StateDiff::Retraction => retractions.push(update),
1965 StateDiff::Addition => additions.push(update),
1966 }
1967 }
1968
1969 soft_assert_no_log!(
1970 updates.iter().map(|update| update.ts).all_equal(),
1971 "all timestamps should be equal: {updates:?}"
1972 );
1973
1974 let mut pre_cluster_retractions = Vec::new();
1976 let mut pre_cluster_additions = Vec::new();
1977 let mut cluster_retractions = Vec::new();
1978 let mut cluster_additions = Vec::new();
1979 let mut builtin_item_updates = Vec::new();
1980 let mut item_retractions = Vec::new();
1981 let mut item_additions = Vec::new();
1982 let mut temp_item_retractions = Vec::new();
1983 let mut temp_item_additions = Vec::new();
1984 let mut post_item_retractions = Vec::new();
1985 let mut post_item_additions = Vec::new();
1986 for update in updates {
1987 let diff = update.diff.clone();
1988 match update.kind {
1989 StateUpdateKind::Role(_)
1990 | StateUpdateKind::RoleAuth(_)
1991 | StateUpdateKind::Database(_)
1992 | StateUpdateKind::Schema(_)
1993 | StateUpdateKind::DefaultPrivilege(_)
1994 | StateUpdateKind::SystemPrivilege(_)
1995 | StateUpdateKind::SystemConfiguration(_)
1996 | StateUpdateKind::NetworkPolicy(_) => push_update(
1997 update,
1998 diff,
1999 &mut pre_cluster_retractions,
2000 &mut pre_cluster_additions,
2001 ),
2002 StateUpdateKind::Cluster(_)
2003 | StateUpdateKind::IntrospectionSourceIndex(_)
2004 | StateUpdateKind::ClusterReplica(_) => push_update(
2005 update,
2006 diff,
2007 &mut cluster_retractions,
2008 &mut cluster_additions,
2009 ),
2010 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2011 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2012 }
2013 StateUpdateKind::TemporaryItem(item) => push_update(
2014 (item, update.ts, update.diff),
2015 diff,
2016 &mut temp_item_retractions,
2017 &mut temp_item_additions,
2018 ),
2019 StateUpdateKind::Item(item) => push_update(
2020 (item, update.ts, update.diff),
2021 diff,
2022 &mut item_retractions,
2023 &mut item_additions,
2024 ),
2025 StateUpdateKind::Comment(_)
2026 | StateUpdateKind::SourceReferences(_)
2027 | StateUpdateKind::AuditLog(_)
2028 | StateUpdateKind::StorageCollectionMetadata(_)
2029 | StateUpdateKind::UnfinalizedShard(_) => push_update(
2030 update,
2031 diff,
2032 &mut post_item_retractions,
2033 &mut post_item_additions,
2034 ),
2035 }
2036 }
2037
2038 let builtin_item_updates = builtin_item_updates
2040 .into_iter()
2041 .map(|(system_object_mapping, ts, diff)| {
2042 let idx = BUILTIN_LOOKUP
2043 .get(&system_object_mapping.description)
2044 .expect("missing builtin")
2045 .0;
2046 (idx, system_object_mapping, ts, diff)
2047 })
2048 .sorted_by_key(|(idx, _, _, _)| *idx)
2049 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2050
2051 let mut other_builtin_retractions = Vec::new();
2053 let mut other_builtin_additions = Vec::new();
2054 let mut builtin_index_retractions = Vec::new();
2055 let mut builtin_index_additions = Vec::new();
2056 for (builtin_item_update, ts, diff) in builtin_item_updates {
2057 match &builtin_item_update.description.object_type {
2058 CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
2059 StateUpdate {
2060 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2061 ts,
2062 diff,
2063 },
2064 diff,
2065 &mut builtin_index_retractions,
2066 &mut builtin_index_additions,
2067 ),
2068 CatalogItemType::Table
2069 | CatalogItemType::Source
2070 | CatalogItemType::Sink
2071 | CatalogItemType::View
2072 | CatalogItemType::MaterializedView
2073 | CatalogItemType::Type
2074 | CatalogItemType::Func
2075 | CatalogItemType::Secret
2076 | CatalogItemType::Connection => push_update(
2077 StateUpdate {
2078 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2079 ts,
2080 diff,
2081 },
2082 diff,
2083 &mut other_builtin_retractions,
2084 &mut other_builtin_additions,
2085 ),
2086 }
2087 }
2088
2089 fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2096 let mut topo: BTreeMap<
2097 (mz_catalog::durable::Item, Timestamp, StateDiff),
2098 BTreeSet<CatalogItemId>,
2099 > = BTreeMap::default();
2100 let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
2101
2102 tracing::debug!(?connections, "sorting connections");
2104 for (connection, ts, diff) in connections.drain(..) {
2105 let statement = mz_sql::parse::parse(&connection.create_sql)
2106 .expect("valid CONNECTION create_sql")
2107 .into_element()
2108 .ast;
2109 let mut dependencies = mz_sql::names::dependencies(&statement)
2110 .expect("failed to find dependencies of CONNECTION");
2111 dependencies.remove(&connection.id);
2113 dependencies.retain(|dep| existing_connections.contains(dep));
2116
2117 assert_none!(topo.insert((connection, ts, diff), dependencies));
2119 }
2120 tracing::debug!(?topo, ?existing_connections, "built topological sort");
2121
2122 while !topo.is_empty() {
2124 let no_deps: Vec<_> = topo
2126 .iter()
2127 .filter_map(|(item, deps)| {
2128 if deps.is_empty() {
2129 Some(item.clone())
2130 } else {
2131 None
2132 }
2133 })
2134 .collect();
2135
2136 if no_deps.is_empty() {
2138 panic!("programming error, cycle in Connections");
2139 }
2140
2141 for item in no_deps {
2143 topo.remove(&item);
2145 topo.values_mut().for_each(|deps| {
2147 deps.remove(&item.0.id);
2148 });
2149 connections.push(item);
2151 }
2152 }
2153 }
2154
2155 fn sort_item_updates(
2171 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2172 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2173 let mut types = Vec::new();
2176 let mut funcs = Vec::new();
2179 let mut secrets = Vec::new();
2180 let mut connections = Vec::new();
2181 let mut sources = Vec::new();
2182 let mut tables = Vec::new();
2183 let mut derived_items = Vec::new();
2184 let mut sinks = Vec::new();
2185 let mut continual_tasks = Vec::new();
2186
2187 for update in item_updates {
2188 match update.0.item_type() {
2189 CatalogItemType::Type => types.push(update),
2190 CatalogItemType::Func => funcs.push(update),
2191 CatalogItemType::Secret => secrets.push(update),
2192 CatalogItemType::Connection => connections.push(update),
2193 CatalogItemType::Source => sources.push(update),
2194 CatalogItemType::Table => tables.push(update),
2195 CatalogItemType::View
2196 | CatalogItemType::MaterializedView
2197 | CatalogItemType::Index => derived_items.push(update),
2198 CatalogItemType::Sink => sinks.push(update),
2199 CatalogItemType::ContinualTask => continual_tasks.push(update),
2200 }
2201 }
2202
2203 for group in [
2205 &mut types,
2206 &mut funcs,
2207 &mut secrets,
2208 &mut sources,
2209 &mut tables,
2210 &mut derived_items,
2211 &mut sinks,
2212 &mut continual_tasks,
2213 ] {
2214 group.sort_by_key(|(item, _, _)| item.id);
2215 }
2216
2217 sort_connections(&mut connections);
2221
2222 iter::empty()
2223 .chain(types)
2224 .chain(funcs)
2225 .chain(secrets)
2226 .chain(connections)
2227 .chain(sources)
2228 .chain(tables)
2229 .chain(derived_items)
2230 .chain(sinks)
2231 .chain(continual_tasks)
2232 .collect()
2233 }
2234
2235 let item_retractions = sort_item_updates(item_retractions);
2236 let item_additions = sort_item_updates(item_additions);
2237
2238 fn sort_temp_item_updates(
2242 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2243 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2244 let mut types = Vec::new();
2247 let mut funcs = Vec::new();
2249 let mut secrets = Vec::new();
2250 let mut connections = Vec::new();
2251 let mut sources = Vec::new();
2252 let mut tables = Vec::new();
2253 let mut derived_items = Vec::new();
2254 let mut sinks = Vec::new();
2255 let mut continual_tasks = Vec::new();
2256
2257 for update in temp_item_updates {
2258 match update.0.item_type() {
2259 CatalogItemType::Type => types.push(update),
2260 CatalogItemType::Func => funcs.push(update),
2261 CatalogItemType::Secret => secrets.push(update),
2262 CatalogItemType::Connection => connections.push(update),
2263 CatalogItemType::Source => sources.push(update),
2264 CatalogItemType::Table => tables.push(update),
2265 CatalogItemType::View
2266 | CatalogItemType::MaterializedView
2267 | CatalogItemType::Index => derived_items.push(update),
2268 CatalogItemType::Sink => sinks.push(update),
2269 CatalogItemType::ContinualTask => continual_tasks.push(update),
2270 }
2271 }
2272
2273 for group in [
2275 &mut types,
2276 &mut funcs,
2277 &mut secrets,
2278 &mut connections,
2279 &mut sources,
2280 &mut tables,
2281 &mut derived_items,
2282 &mut sinks,
2283 &mut continual_tasks,
2284 ] {
2285 group.sort_by_key(|(item, _, _)| item.id);
2286 }
2287
2288 iter::empty()
2289 .chain(types)
2290 .chain(funcs)
2291 .chain(secrets)
2292 .chain(connections)
2293 .chain(sources)
2294 .chain(tables)
2295 .chain(derived_items)
2296 .chain(sinks)
2297 .chain(continual_tasks)
2298 .collect()
2299 }
2300 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2301 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2302
2303 fn merge_item_updates(
2305 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2306 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2307 ) -> Vec<StateUpdate> {
2308 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2309
2310 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2311 (item_updates.front(), temp_item_updates.front())
2312 {
2313 if item.id < temp_item.id {
2314 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2315 state_updates.push(StateUpdate {
2316 kind: StateUpdateKind::Item(item),
2317 ts,
2318 diff,
2319 });
2320 } else if item.id > temp_item.id {
2321 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2322 state_updates.push(StateUpdate {
2323 kind: StateUpdateKind::TemporaryItem(temp_item),
2324 ts,
2325 diff,
2326 });
2327 } else {
2328 unreachable!(
2329 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2330 );
2331 }
2332 }
2333
2334 while let Some((item, ts, diff)) = item_updates.pop_front() {
2335 state_updates.push(StateUpdate {
2336 kind: StateUpdateKind::Item(item),
2337 ts,
2338 diff,
2339 });
2340 }
2341
2342 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2343 state_updates.push(StateUpdate {
2344 kind: StateUpdateKind::TemporaryItem(temp_item),
2345 ts,
2346 diff,
2347 });
2348 }
2349
2350 state_updates
2351 }
2352 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2353 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2354
2355 iter::empty()
2357 .chain(post_item_retractions.into_iter().rev())
2359 .chain(item_retractions.into_iter().rev())
2360 .chain(builtin_index_retractions.into_iter().rev())
2361 .chain(cluster_retractions.into_iter().rev())
2362 .chain(other_builtin_retractions.into_iter().rev())
2363 .chain(pre_cluster_retractions.into_iter().rev())
2364 .chain(pre_cluster_additions)
2365 .chain(other_builtin_additions)
2366 .chain(cluster_additions)
2367 .chain(builtin_index_additions)
2368 .chain(item_additions)
2369 .chain(post_item_additions)
2370 .collect()
2371}
2372
2373enum BootstrapApplyState {
2377 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2379 Items(Vec<StateUpdate>),
2381 Updates(Vec<StateUpdate>),
2383}
2384
2385impl BootstrapApplyState {
2386 fn new(update: StateUpdate) -> BootstrapApplyState {
2387 match update {
2388 StateUpdate {
2389 kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2390 diff: StateDiff::Addition,
2391 ..
2392 } if matches!(
2393 system_object_mapping.description.object_type,
2394 CatalogItemType::View
2395 ) =>
2396 {
2397 let view_addition = lookup_builtin_view_addition(system_object_mapping);
2398 BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2399 }
2400 StateUpdate {
2401 kind: StateUpdateKind::IntrospectionSourceIndex(_),
2402 ..
2403 }
2404 | StateUpdate {
2405 kind: StateUpdateKind::SystemObjectMapping(_),
2406 ..
2407 }
2408 | StateUpdate {
2409 kind: StateUpdateKind::Item(_),
2410 ..
2411 } => BootstrapApplyState::Items(vec![update]),
2412 update => BootstrapApplyState::Updates(vec![update]),
2413 }
2414 }
2415
2416 async fn apply(
2422 self,
2423 state: &mut CatalogState,
2424 retractions: &mut InProgressRetractions,
2425 local_expression_cache: &mut LocalExpressionCache,
2426 ) -> (
2427 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2428 Vec<ParsedStateUpdate>,
2429 ) {
2430 match self {
2431 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2432 let restore = state.system_configuration.clone();
2433 state.system_configuration.enable_for_item_parsing();
2434 let builtin_table_updates = CatalogState::parse_builtin_views(
2435 state,
2436 builtin_view_additions,
2437 retractions,
2438 local_expression_cache,
2439 )
2440 .await;
2441 state.system_configuration = restore;
2442 (builtin_table_updates, Vec::new())
2443 }
2444 BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2445 state
2446 .apply_updates_inner(updates, retractions, local_expression_cache)
2447 .expect("corrupt catalog")
2448 }),
2449 BootstrapApplyState::Updates(updates) => state
2450 .apply_updates_inner(updates, retractions, local_expression_cache)
2451 .expect("corrupt catalog"),
2452 }
2453 }
2454
2455 async fn step(
2456 self,
2457 next: BootstrapApplyState,
2458 state: &mut CatalogState,
2459 retractions: &mut InProgressRetractions,
2460 local_expression_cache: &mut LocalExpressionCache,
2461 ) -> (
2462 BootstrapApplyState,
2463 (
2464 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2465 Vec<ParsedStateUpdate>,
2466 ),
2467 ) {
2468 match (self, next) {
2469 (
2470 BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2471 BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2472 ) => {
2473 builtin_view_additions.extend(next_builtin_view_additions);
2475 (
2476 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2477 (Vec::new(), Vec::new()),
2478 )
2479 }
2480 (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2481 updates.extend(next_updates);
2483 (
2484 BootstrapApplyState::Items(updates),
2485 (Vec::new(), Vec::new()),
2486 )
2487 }
2488 (
2489 BootstrapApplyState::Updates(mut updates),
2490 BootstrapApplyState::Updates(next_updates),
2491 ) => {
2492 updates.extend(next_updates);
2494 (
2495 BootstrapApplyState::Updates(updates),
2496 (Vec::new(), Vec::new()),
2497 )
2498 }
2499 (apply_state, next_apply_state) => {
2500 let updates = apply_state
2502 .apply(state, retractions, local_expression_cache)
2503 .await;
2504 (next_apply_state, updates)
2505 }
2506 }
2507 }
2508}
2509
2510fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2515where
2516 K: Ord + Clone + Debug,
2517 V: PartialEq + Debug,
2518{
2519 match diff {
2520 StateDiff::Retraction => {
2521 let prev = map.remove(key);
2522 assert_eq!(
2523 prev,
2524 Some(value),
2525 "retraction does not match existing value: {key:?}"
2526 );
2527 }
2528 StateDiff::Addition => {
2529 let prev = map.insert(key.clone(), value);
2530 assert_eq!(
2531 prev, None,
2532 "values must be explicitly retracted before inserting a new value: {key:?}"
2533 );
2534 }
2535 }
2536}
2537
2538fn apply_with_update<K, V, D>(
2541 map: &mut BTreeMap<K, V>,
2542 durable: D,
2543 key_fn: impl FnOnce(&D) -> K,
2544 diff: StateDiff,
2545 retractions: &mut BTreeMap<D::Key, V>,
2546) where
2547 K: Ord,
2548 V: UpdateFrom<D> + PartialEq + Debug,
2549 D: DurableType,
2550 D::Key: Ord,
2551{
2552 match diff {
2553 StateDiff::Retraction => {
2554 let mem_key = key_fn(&durable);
2555 let value = map
2556 .remove(&mem_key)
2557 .expect("retraction does not match existing value: {key:?}");
2558 let durable_key = durable.into_key_value().0;
2559 retractions.insert(durable_key, value);
2560 }
2561 StateDiff::Addition => {
2562 let mem_key = key_fn(&durable);
2563 let durable_key = durable.key();
2564 let value = match retractions.remove(&durable_key) {
2565 Some(mut retraction) => {
2566 retraction.update_from(durable);
2567 retraction
2568 }
2569 None => durable.into(),
2570 };
2571 let prev = map.insert(mem_key, value);
2572 assert_eq!(
2573 prev, None,
2574 "values must be explicitly retracted before inserting a new value"
2575 );
2576 }
2577 }
2578}
2579
2580fn lookup_builtin_view_addition(
2582 mapping: SystemObjectMapping,
2583) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2584 let (_, builtin) = BUILTIN_LOOKUP
2585 .get(&mapping.description)
2586 .expect("missing builtin view");
2587 let Builtin::View(view) = builtin else {
2588 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2589 };
2590
2591 (
2592 view,
2593 mapping.unique_identifier.catalog_id,
2594 mapping.unique_identifier.global_id,
2595 )
2596}