1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29 SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35 Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36 TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::{index_sql, sort_topological};
67
68#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81 roles: BTreeMap<RoleKey, Role>,
82 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83 databases: BTreeMap<DatabaseKey, Database>,
84 schemas: BTreeMap<SchemaKey, Schema>,
85 clusters: BTreeMap<ClusterKey, Cluster>,
86 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87 items: BTreeMap<ItemKey, CatalogEntry>,
88 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94 #[must_use]
98 #[instrument]
99 pub(crate) async fn apply_updates(
100 &mut self,
101 updates: Vec<StateUpdate>,
102 local_expression_cache: &mut LocalExpressionCache,
103 ) -> (
104 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
105 Vec<ParsedStateUpdate>,
106 ) {
107 let mut builtin_table_updates = Vec::with_capacity(updates.len());
108 let mut catalog_updates = Vec::with_capacity(updates.len());
109
110 let updates = Self::consolidate_updates(updates);
115
116 let mut groups: Vec<Vec<_>> = Vec::new();
118 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
119 let updates = sort_updates(updates.collect());
123 groups.push(updates);
124 }
125
126 for updates in groups {
127 let mut apply_state = ApplyState::Updates(Vec::new());
128 let mut retractions = InProgressRetractions::default();
129
130 for update in updates {
131 let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
132 .step(
133 ApplyState::new(update),
134 self,
135 &mut retractions,
136 local_expression_cache,
137 )
138 .await;
139 apply_state = next_apply_state;
140 builtin_table_updates.extend(builtin_table_update);
141 catalog_updates.extend(catalog_update);
142 }
143
144 let (builtin_table_update, catalog_update) = apply_state
146 .apply(self, &mut retractions, local_expression_cache)
147 .await;
148 builtin_table_updates.extend(builtin_table_update);
149 catalog_updates.extend(catalog_update);
150 }
151
152 (builtin_table_updates, catalog_updates)
153 }
154
155 fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
163 let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
164 .into_iter()
165 .map(|update| (update.kind, update.ts, update.diff.into()))
166 .collect_vec();
167
168 consolidate_updates(&mut updates);
169
170 updates
171 .into_iter()
172 .map(|(kind, ts, diff)| StateUpdate {
173 kind,
174 ts,
175 diff: diff
176 .try_into()
177 .expect("catalog state cannot have diff other than -1 or 1"),
178 })
179 .collect_vec()
180 }
181
182 #[instrument(level = "debug")]
183 fn apply_updates_inner(
184 &mut self,
185 updates: Vec<StateUpdate>,
186 retractions: &mut InProgressRetractions,
187 local_expression_cache: &mut LocalExpressionCache,
188 ) -> Result<
189 (
190 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
191 Vec<ParsedStateUpdate>,
192 ),
193 CatalogError,
194 > {
195 soft_assert_no_log!(
196 updates.iter().map(|update| update.ts).all_equal(),
197 "all timestamps should be equal: {updates:?}"
198 );
199
200 let mut update_system_config = false;
201
202 let mut builtin_table_updates = Vec::with_capacity(updates.len());
203 let mut catalog_updates = Vec::new();
204
205 for state_update in updates {
206 if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
207 update_system_config = true;
208 }
209
210 match state_update.diff {
211 StateDiff::Retraction => {
212 if let Some(update) =
216 parsed_state_updates::parse_state_update(self, state_update.clone())
217 {
218 catalog_updates.push(update);
219 }
220
221 builtin_table_updates.extend(self.generate_builtin_table_update(
224 state_update.kind.clone(),
225 state_update.diff,
226 ));
227 self.apply_update(
228 state_update.kind,
229 state_update.diff,
230 retractions,
231 local_expression_cache,
232 )?;
233 }
234 StateDiff::Addition => {
235 self.apply_update(
236 state_update.kind.clone(),
237 state_update.diff,
238 retractions,
239 local_expression_cache,
240 )?;
241 builtin_table_updates.extend(self.generate_builtin_table_update(
245 state_update.kind.clone(),
246 state_update.diff,
247 ));
248
249 if let Some(update) =
252 parsed_state_updates::parse_state_update(self, state_update.clone())
253 {
254 catalog_updates.push(update);
255 }
256 }
257 }
258 }
259
260 if update_system_config {
261 self.system_configuration.dyncfg_updates();
262 }
263
264 Ok((builtin_table_updates, catalog_updates))
265 }
266
267 #[instrument(level = "debug")]
268 fn apply_update(
269 &mut self,
270 kind: StateUpdateKind,
271 diff: StateDiff,
272 retractions: &mut InProgressRetractions,
273 local_expression_cache: &mut LocalExpressionCache,
274 ) -> Result<(), CatalogError> {
275 match kind {
276 StateUpdateKind::Role(role) => {
277 self.apply_role_update(role, diff, retractions);
278 }
279 StateUpdateKind::RoleAuth(role_auth) => {
280 self.apply_role_auth_update(role_auth, diff, retractions);
281 }
282 StateUpdateKind::Database(database) => {
283 self.apply_database_update(database, diff, retractions);
284 }
285 StateUpdateKind::Schema(schema) => {
286 self.apply_schema_update(schema, diff, retractions);
287 }
288 StateUpdateKind::DefaultPrivilege(default_privilege) => {
289 self.apply_default_privilege_update(default_privilege, diff, retractions);
290 }
291 StateUpdateKind::SystemPrivilege(system_privilege) => {
292 self.apply_system_privilege_update(system_privilege, diff, retractions);
293 }
294 StateUpdateKind::SystemConfiguration(system_configuration) => {
295 self.apply_system_configuration_update(system_configuration, diff, retractions);
296 }
297 StateUpdateKind::Cluster(cluster) => {
298 self.apply_cluster_update(cluster, diff, retractions);
299 }
300 StateUpdateKind::NetworkPolicy(network_policy) => {
301 self.apply_network_policy_update(network_policy, diff, retractions);
302 }
303 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304 self.apply_introspection_source_index_update(
305 introspection_source_index,
306 diff,
307 retractions,
308 );
309 }
310 StateUpdateKind::ClusterReplica(cluster_replica) => {
311 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
312 }
313 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
314 self.apply_system_object_mapping_update(
315 system_object_mapping,
316 diff,
317 retractions,
318 local_expression_cache,
319 );
320 }
321 StateUpdateKind::TemporaryItem(item) => {
322 self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
323 }
324 StateUpdateKind::Item(item) => {
325 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
326 }
327 StateUpdateKind::Comment(comment) => {
328 self.apply_comment_update(comment, diff, retractions);
329 }
330 StateUpdateKind::SourceReferences(source_reference) => {
331 self.apply_source_references_update(source_reference, diff, retractions);
332 }
333 StateUpdateKind::AuditLog(_audit_log) => {
334 }
336 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
337 self.apply_storage_collection_metadata_update(
338 storage_collection_metadata,
339 diff,
340 retractions,
341 );
342 }
343 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
344 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
345 }
346 }
347
348 Ok(())
349 }
350
351 #[instrument(level = "debug")]
352 fn apply_role_auth_update(
353 &mut self,
354 role_auth: mz_catalog::durable::RoleAuth,
355 diff: StateDiff,
356 retractions: &mut InProgressRetractions,
357 ) {
358 apply_with_update(
359 &mut self.role_auth_by_id,
360 role_auth,
361 |role_auth| role_auth.role_id,
362 diff,
363 &mut retractions.role_auths,
364 );
365 }
366
367 #[instrument(level = "debug")]
368 fn apply_role_update(
369 &mut self,
370 role: mz_catalog::durable::Role,
371 diff: StateDiff,
372 retractions: &mut InProgressRetractions,
373 ) {
374 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
375 apply_with_update(
376 &mut self.roles_by_id,
377 role,
378 |role| role.id,
379 diff,
380 &mut retractions.roles,
381 );
382 }
383
384 #[instrument(level = "debug")]
385 fn apply_database_update(
386 &mut self,
387 database: mz_catalog::durable::Database,
388 diff: StateDiff,
389 retractions: &mut InProgressRetractions,
390 ) {
391 apply_inverted_lookup(
392 &mut self.database_by_name,
393 &database.name,
394 database.id,
395 diff,
396 );
397 apply_with_update(
398 &mut self.database_by_id,
399 database,
400 |database| database.id,
401 diff,
402 &mut retractions.databases,
403 );
404 }
405
406 #[instrument(level = "debug")]
407 fn apply_schema_update(
408 &mut self,
409 schema: mz_catalog::durable::Schema,
410 diff: StateDiff,
411 retractions: &mut InProgressRetractions,
412 ) {
413 match &schema.database_id {
414 Some(database_id) => {
415 let db = self
416 .database_by_id
417 .get_mut(database_id)
418 .expect("catalog out of sync");
419 apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
420 apply_with_update(
421 &mut db.schemas_by_id,
422 schema,
423 |schema| schema.id,
424 diff,
425 &mut retractions.schemas,
426 );
427 }
428 None => {
429 apply_inverted_lookup(
430 &mut self.ambient_schemas_by_name,
431 &schema.name,
432 schema.id,
433 diff,
434 );
435 apply_with_update(
436 &mut self.ambient_schemas_by_id,
437 schema,
438 |schema| schema.id,
439 diff,
440 &mut retractions.schemas,
441 );
442 }
443 }
444 }
445
446 #[instrument(level = "debug")]
447 fn apply_default_privilege_update(
448 &mut self,
449 default_privilege: mz_catalog::durable::DefaultPrivilege,
450 diff: StateDiff,
451 _retractions: &mut InProgressRetractions,
452 ) {
453 match diff {
454 StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
455 .grant(default_privilege.object, default_privilege.acl_item),
456 StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
457 .revoke(&default_privilege.object, &default_privilege.acl_item),
458 }
459 }
460
461 #[instrument(level = "debug")]
462 fn apply_system_privilege_update(
463 &mut self,
464 system_privilege: MzAclItem,
465 diff: StateDiff,
466 _retractions: &mut InProgressRetractions,
467 ) {
468 match diff {
469 StateDiff::Addition => {
470 Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
471 }
472 StateDiff::Retraction => {
473 Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
474 }
475 }
476 }
477
478 #[instrument(level = "debug")]
479 fn apply_system_configuration_update(
480 &mut self,
481 system_configuration: mz_catalog::durable::SystemConfiguration,
482 diff: StateDiff,
483 _retractions: &mut InProgressRetractions,
484 ) {
485 let res = match diff {
486 StateDiff::Addition => self.insert_system_configuration(
487 &system_configuration.name,
488 VarInput::Flat(&system_configuration.value),
489 ),
490 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
491 };
492 match res {
493 Ok(_) => (),
494 Err(Error {
498 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
499 }) => {
500 warn!(%name, "unknown system parameter from catalog storage");
501 }
502 Err(e) => panic!("unable to update system variable: {e:?}"),
503 }
504 }
505
506 #[instrument(level = "debug")]
507 fn apply_cluster_update(
508 &mut self,
509 cluster: mz_catalog::durable::Cluster,
510 diff: StateDiff,
511 retractions: &mut InProgressRetractions,
512 ) {
513 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
514 apply_with_update(
515 &mut self.clusters_by_id,
516 cluster,
517 |cluster| cluster.id,
518 diff,
519 &mut retractions.clusters,
520 );
521 }
522
523 #[instrument(level = "debug")]
524 fn apply_network_policy_update(
525 &mut self,
526 policy: mz_catalog::durable::NetworkPolicy,
527 diff: StateDiff,
528 retractions: &mut InProgressRetractions,
529 ) {
530 apply_inverted_lookup(
531 &mut self.network_policies_by_name,
532 &policy.name,
533 policy.id,
534 diff,
535 );
536 apply_with_update(
537 &mut self.network_policies_by_id,
538 policy,
539 |policy| policy.id,
540 diff,
541 &mut retractions.network_policies,
542 );
543 }
544
545 #[instrument(level = "debug")]
546 fn apply_introspection_source_index_update(
547 &mut self,
548 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
549 diff: StateDiff,
550 retractions: &mut InProgressRetractions,
551 ) {
552 let cluster = self
553 .clusters_by_id
554 .get_mut(&introspection_source_index.cluster_id)
555 .expect("catalog out of sync");
556 let log = BUILTIN_LOG_LOOKUP
557 .get(introspection_source_index.name.as_str())
558 .expect("missing log");
559 apply_inverted_lookup(
560 &mut cluster.log_indexes,
561 &log.variant,
562 introspection_source_index.index_id,
563 diff,
564 );
565
566 match diff {
567 StateDiff::Addition => {
568 if let Some(mut entry) = retractions
569 .introspection_source_indexes
570 .remove(&introspection_source_index.item_id)
571 {
572 let (index_name, index) = self.create_introspection_source_index(
575 introspection_source_index.cluster_id,
576 log,
577 introspection_source_index.index_id,
578 );
579 assert_eq!(entry.id, introspection_source_index.item_id);
580 assert_eq!(entry.oid, introspection_source_index.oid);
581 assert_eq!(entry.name, index_name);
582 entry.item = index;
583 self.insert_entry(entry);
584 } else {
585 self.insert_introspection_source_index(
586 introspection_source_index.cluster_id,
587 log,
588 introspection_source_index.item_id,
589 introspection_source_index.index_id,
590 introspection_source_index.oid,
591 );
592 }
593 }
594 StateDiff::Retraction => {
595 let entry = self.drop_item(introspection_source_index.item_id);
596 retractions
597 .introspection_source_indexes
598 .insert(entry.id, entry);
599 }
600 }
601 }
602
603 #[instrument(level = "debug")]
604 fn apply_cluster_replica_update(
605 &mut self,
606 cluster_replica: mz_catalog::durable::ClusterReplica,
607 diff: StateDiff,
608 _retractions: &mut InProgressRetractions,
609 ) {
610 let cluster = self
611 .clusters_by_id
612 .get(&cluster_replica.cluster_id)
613 .expect("catalog out of sync");
614 let azs = cluster.availability_zones();
615 let location = self
616 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
617 .expect("catalog in unexpected state");
618 let cluster = self
619 .clusters_by_id
620 .get_mut(&cluster_replica.cluster_id)
621 .expect("catalog out of sync");
622 apply_inverted_lookup(
623 &mut cluster.replica_id_by_name_,
624 &cluster_replica.name,
625 cluster_replica.replica_id,
626 diff,
627 );
628 match diff {
629 StateDiff::Retraction => {
630 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
631 assert!(
632 prev.is_some(),
633 "retraction does not match existing value: {:?}",
634 cluster_replica.replica_id
635 );
636 }
637 StateDiff::Addition => {
638 let logging = ReplicaLogging {
639 log_logging: cluster_replica.config.logging.log_logging,
640 interval: cluster_replica.config.logging.interval,
641 };
642 let config = ReplicaConfig {
643 location,
644 compute: ComputeReplicaConfig { logging },
645 };
646 let mem_cluster_replica = ClusterReplica {
647 name: cluster_replica.name.clone(),
648 cluster_id: cluster_replica.cluster_id,
649 replica_id: cluster_replica.replica_id,
650 config,
651 owner_id: cluster_replica.owner_id,
652 };
653 let prev = cluster
654 .replicas_by_id_
655 .insert(cluster_replica.replica_id, mem_cluster_replica);
656 assert_eq!(
657 prev, None,
658 "values must be explicitly retracted before inserting a new value: {:?}",
659 cluster_replica.replica_id
660 );
661 }
662 }
663 }
664
665 #[instrument(level = "debug")]
666 fn apply_system_object_mapping_update(
667 &mut self,
668 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
669 diff: StateDiff,
670 retractions: &mut InProgressRetractions,
671 local_expression_cache: &mut LocalExpressionCache,
672 ) {
673 let item_id = system_object_mapping.unique_identifier.catalog_id;
674 let global_id = system_object_mapping.unique_identifier.global_id;
675
676 if system_object_mapping.unique_identifier.runtime_alterable() {
677 return;
681 }
682
683 if let StateDiff::Retraction = diff {
684 let entry = self.drop_item(item_id);
685 retractions.system_object_mappings.insert(item_id, entry);
686 return;
687 }
688
689 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
690 self.insert_entry(entry);
695 return;
696 }
697
698 let builtin = BUILTIN_LOOKUP
699 .get(&system_object_mapping.description)
700 .expect("missing builtin")
701 .1;
702 let schema_name = builtin.schema();
703 let schema_id = self
704 .ambient_schemas_by_name
705 .get(schema_name)
706 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
707 let name = QualifiedItemName {
708 qualifiers: ItemQualifiers {
709 database_spec: ResolvedDatabaseSpecifier::Ambient,
710 schema_spec: SchemaSpecifier::Id(*schema_id),
711 },
712 item: builtin.name().into(),
713 };
714 match builtin {
715 Builtin::Log(log) => {
716 let mut acl_items = vec![rbac::owner_privilege(
717 mz_sql::catalog::ObjectType::Source,
718 MZ_SYSTEM_ROLE_ID,
719 )];
720 acl_items.extend_from_slice(&log.access);
721 self.insert_item(
722 item_id,
723 log.oid,
724 name.clone(),
725 CatalogItem::Log(Log {
726 variant: log.variant,
727 global_id,
728 }),
729 MZ_SYSTEM_ROLE_ID,
730 PrivilegeMap::from_mz_acl_items(acl_items),
731 );
732 }
733
734 Builtin::Table(table) => {
735 let mut acl_items = vec![rbac::owner_privilege(
736 mz_sql::catalog::ObjectType::Table,
737 MZ_SYSTEM_ROLE_ID,
738 )];
739 acl_items.extend_from_slice(&table.access);
740
741 self.insert_item(
742 item_id,
743 table.oid,
744 name.clone(),
745 CatalogItem::Table(Table {
746 create_sql: None,
747 desc: VersionedRelationDesc::new(table.desc.clone()),
748 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
749 conn_id: None,
750 resolved_ids: ResolvedIds::empty(),
751 custom_logical_compaction_window: table.is_retained_metrics_object.then(
752 || {
753 self.system_config()
754 .metrics_retention()
755 .try_into()
756 .expect("invalid metrics retention")
757 },
758 ),
759 is_retained_metrics_object: table.is_retained_metrics_object,
760 data_source: TableDataSource::TableWrites {
761 defaults: vec![Expr::null(); table.desc.arity()],
762 },
763 }),
764 MZ_SYSTEM_ROLE_ID,
765 PrivilegeMap::from_mz_acl_items(acl_items),
766 );
767 }
768 Builtin::Index(index) => {
769 let custom_logical_compaction_window =
770 index.is_retained_metrics_object.then(|| {
771 self.system_config()
772 .metrics_retention()
773 .try_into()
774 .expect("invalid metrics retention")
775 });
776 let versions = BTreeMap::new();
778
779 let item = self
780 .parse_item(
781 global_id,
782 &index.create_sql(),
783 &versions,
784 None,
785 index.is_retained_metrics_object,
786 custom_logical_compaction_window,
787 local_expression_cache,
788 None,
789 )
790 .unwrap_or_else(|e| {
791 panic!(
792 "internal error: failed to load bootstrap index:\n\
793 {}\n\
794 error:\n\
795 {:?}\n\n\
796 make sure that the schema name is specified in the builtin index's create sql statement.",
797 index.name, e
798 )
799 });
800 let CatalogItem::Index(_) = item else {
801 panic!(
802 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
803 index.name
804 );
805 };
806
807 self.insert_item(
808 item_id,
809 index.oid,
810 name,
811 item,
812 MZ_SYSTEM_ROLE_ID,
813 PrivilegeMap::default(),
814 );
815 }
816 Builtin::View(_) => {
817 unreachable!("views added elsewhere");
819 }
820
821 Builtin::Type(typ) => {
823 let typ = self.resolve_builtin_type_references(typ);
824 if let CatalogType::Array { element_reference } = typ.details.typ {
825 let entry = self.get_entry_mut(&element_reference);
826 let item_type = match &mut entry.item {
827 CatalogItem::Type(item_type) => item_type,
828 _ => unreachable!("types can only reference other types"),
829 };
830 item_type.details.array_id = Some(item_id);
831 }
832
833 let schema_id = self.resolve_system_schema(typ.schema);
834
835 self.insert_item(
836 item_id,
837 typ.oid,
838 QualifiedItemName {
839 qualifiers: ItemQualifiers {
840 database_spec: ResolvedDatabaseSpecifier::Ambient,
841 schema_spec: SchemaSpecifier::Id(schema_id),
842 },
843 item: typ.name.to_owned(),
844 },
845 CatalogItem::Type(Type {
846 create_sql: None,
847 global_id,
848 details: typ.details.clone(),
849 resolved_ids: ResolvedIds::empty(),
850 }),
851 MZ_SYSTEM_ROLE_ID,
852 PrivilegeMap::from_mz_acl_items(vec![
853 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
854 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
855 ]),
856 );
857 }
858
859 Builtin::Func(func) => {
860 let oid = INVALID_OID;
864 self.insert_item(
865 item_id,
866 oid,
867 name.clone(),
868 CatalogItem::Func(Func {
869 inner: func.inner,
870 global_id,
871 }),
872 MZ_SYSTEM_ROLE_ID,
873 PrivilegeMap::default(),
874 );
875 }
876
877 Builtin::Source(coll) => {
878 let mut acl_items = vec![rbac::owner_privilege(
879 mz_sql::catalog::ObjectType::Source,
880 MZ_SYSTEM_ROLE_ID,
881 )];
882 acl_items.extend_from_slice(&coll.access);
883
884 self.insert_item(
885 item_id,
886 coll.oid,
887 name.clone(),
888 CatalogItem::Source(Source {
889 create_sql: None,
890 data_source: coll.data_source.clone(),
891 desc: coll.desc.clone(),
892 global_id,
893 timeline: Timeline::EpochMilliseconds,
894 resolved_ids: ResolvedIds::empty(),
895 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
896 || {
897 self.system_config()
898 .metrics_retention()
899 .try_into()
900 .expect("invalid metrics retention")
901 },
902 ),
903 is_retained_metrics_object: coll.is_retained_metrics_object,
904 }),
905 MZ_SYSTEM_ROLE_ID,
906 PrivilegeMap::from_mz_acl_items(acl_items),
907 );
908 }
909 Builtin::MaterializedView(mv) => {
910 let mut acl_items = vec![rbac::owner_privilege(
911 mz_sql::catalog::ObjectType::MaterializedView,
912 MZ_SYSTEM_ROLE_ID,
913 )];
914 acl_items.extend_from_slice(&mv.access);
915 let versions = BTreeMap::new();
917
918 let mut item = self
919 .parse_item(
920 global_id,
921 &mv.create_sql(),
922 &versions,
923 None,
924 false,
925 None,
926 local_expression_cache,
927 None,
928 )
929 .unwrap_or_else(|e| {
930 panic!(
931 "internal error: failed to load bootstrap materialized view:\n\
932 {}\n\
933 error:\n\
934 {e:?}\n\n\
935 make sure that the schema name is specified in the builtin \
936 materialized view's create sql statement.",
937 mv.name,
938 )
939 });
940 let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
941 panic!(
942 "internal error: builtin materialized view {}'s SQL does not begin \
943 with \"CREATE MATERIALIZED VIEW\".",
944 mv.name,
945 );
946 };
947
948 let mut desc = catalog_mv.desc.latest();
952 for key in &mv.desc.typ().keys {
953 desc = desc.with_key(key.clone());
954 }
955 catalog_mv.desc = VersionedRelationDesc::new(desc);
956
957 self.insert_item(
958 item_id,
959 mv.oid,
960 name,
961 item,
962 MZ_SYSTEM_ROLE_ID,
963 PrivilegeMap::from_mz_acl_items(acl_items),
964 );
965 }
966 Builtin::ContinualTask(ct) => {
967 let mut acl_items = vec![rbac::owner_privilege(
968 mz_sql::catalog::ObjectType::Source,
969 MZ_SYSTEM_ROLE_ID,
970 )];
971 acl_items.extend_from_slice(&ct.access);
972 let versions = BTreeMap::new();
974
975 let item = self
976 .parse_item(
977 global_id,
978 &ct.create_sql(),
979 &versions,
980 None,
981 false,
982 None,
983 local_expression_cache,
984 None,
985 )
986 .unwrap_or_else(|e| {
987 panic!(
988 "internal error: failed to load bootstrap continual task:\n\
989 {}\n\
990 error:\n\
991 {:?}\n\n\
992 make sure that the schema name is specified in the builtin continual task's create sql statement.",
993 ct.name, e
994 )
995 });
996 let CatalogItem::ContinualTask(_) = &item else {
997 panic!(
998 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
999 ct.name
1000 );
1001 };
1002
1003 self.insert_item(
1004 item_id,
1005 ct.oid,
1006 name,
1007 item,
1008 MZ_SYSTEM_ROLE_ID,
1009 PrivilegeMap::from_mz_acl_items(acl_items),
1010 );
1011 }
1012 Builtin::Connection(connection) => {
1013 let versions = BTreeMap::new();
1015 let mut item = self
1016 .parse_item(
1017 global_id,
1018 connection.sql,
1019 &versions,
1020 None,
1021 false,
1022 None,
1023 local_expression_cache,
1024 None,
1025 )
1026 .unwrap_or_else(|e| {
1027 panic!(
1028 "internal error: failed to load bootstrap connection:\n\
1029 {}\n\
1030 error:\n\
1031 {:?}\n\n\
1032 make sure that the schema name is specified in the builtin connection's create sql statement.",
1033 connection.name, e
1034 )
1035 });
1036 let CatalogItem::Connection(_) = &mut item else {
1037 panic!(
1038 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1039 connection.name
1040 );
1041 };
1042
1043 let mut acl_items = vec![rbac::owner_privilege(
1044 mz_sql::catalog::ObjectType::Connection,
1045 connection.owner_id.clone(),
1046 )];
1047 acl_items.extend_from_slice(connection.access);
1048
1049 self.insert_item(
1050 item_id,
1051 connection.oid,
1052 name.clone(),
1053 item,
1054 connection.owner_id.clone(),
1055 PrivilegeMap::from_mz_acl_items(acl_items),
1056 );
1057 }
1058 }
1059 }
1060
1061 #[instrument(level = "debug")]
1062 fn apply_temporary_item_update(
1063 &mut self,
1064 temporary_item: TemporaryItem,
1065 diff: StateDiff,
1066 retractions: &mut InProgressRetractions,
1067 local_expression_cache: &mut LocalExpressionCache,
1068 ) {
1069 match diff {
1070 StateDiff::Addition => {
1071 let TemporaryItem {
1072 id,
1073 oid,
1074 global_id,
1075 schema_id,
1076 name,
1077 conn_id,
1078 create_sql,
1079 owner_id,
1080 privileges,
1081 extra_versions,
1082 } = temporary_item;
1083 let temp_conn_id = conn_id
1086 .as_ref()
1087 .expect("temporary items must have a connection id");
1088 if !self.temporary_schemas.contains_key(temp_conn_id) {
1089 self.create_temporary_schema(temp_conn_id, owner_id)
1090 .expect("failed to create temporary schema");
1091 }
1092 let schema = self.find_temp_schema(&schema_id);
1093 let name = QualifiedItemName {
1094 qualifiers: ItemQualifiers {
1095 database_spec: schema.database().clone(),
1096 schema_spec: schema.id().clone(),
1097 },
1098 item: name.clone(),
1099 };
1100
1101 let entry = match retractions.temp_items.remove(&id) {
1102 Some(mut retraction) => {
1103 assert_eq!(retraction.id, id);
1104
1105 if retraction.create_sql() != create_sql {
1110 let mut catalog_item = self
1111 .deserialize_item(
1112 global_id,
1113 &create_sql,
1114 &extra_versions,
1115 local_expression_cache,
1116 Some(retraction.item),
1117 )
1118 .unwrap_or_else(|e| {
1119 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1120 });
1121 catalog_item.set_conn_id(conn_id);
1128 retraction.item = catalog_item;
1129 }
1130
1131 retraction.id = id;
1132 retraction.oid = oid;
1133 retraction.name = name;
1134 retraction.owner_id = owner_id;
1135 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1136 retraction
1137 }
1138 None => {
1139 let mut catalog_item = self
1140 .deserialize_item(
1141 global_id,
1142 &create_sql,
1143 &extra_versions,
1144 local_expression_cache,
1145 None,
1146 )
1147 .unwrap_or_else(|e| {
1148 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1149 });
1150
1151 catalog_item.set_conn_id(conn_id);
1157
1158 CatalogEntry {
1159 item: catalog_item,
1160 referenced_by: Vec::new(),
1161 used_by: Vec::new(),
1162 id,
1163 oid,
1164 name,
1165 owner_id,
1166 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1167 }
1168 }
1169 };
1170 self.insert_entry(entry);
1171 }
1172 StateDiff::Retraction => {
1173 let entry = self.drop_item(temporary_item.id);
1174 retractions.temp_items.insert(temporary_item.id, entry);
1175 }
1176 }
1177 }
1178
1179 #[instrument(level = "debug")]
1180 fn apply_item_update(
1181 &mut self,
1182 item: mz_catalog::durable::Item,
1183 diff: StateDiff,
1184 retractions: &mut InProgressRetractions,
1185 local_expression_cache: &mut LocalExpressionCache,
1186 ) -> Result<(), CatalogError> {
1187 match diff {
1188 StateDiff::Addition => {
1189 let key = item.key();
1190 let mz_catalog::durable::Item {
1191 id,
1192 oid,
1193 global_id,
1194 schema_id,
1195 name,
1196 create_sql,
1197 owner_id,
1198 privileges,
1199 extra_versions,
1200 } = item;
1201 let schema = self.find_non_temp_schema(&schema_id);
1202 let name = QualifiedItemName {
1203 qualifiers: ItemQualifiers {
1204 database_spec: schema.database().clone(),
1205 schema_spec: schema.id().clone(),
1206 },
1207 item: name.clone(),
1208 };
1209 let entry = match retractions.items.remove(&key) {
1210 Some(retraction) => {
1211 assert_eq!(retraction.id, item.id);
1212
1213 let item = self
1214 .deserialize_item(
1215 global_id,
1216 &create_sql,
1217 &extra_versions,
1218 local_expression_cache,
1219 Some(retraction.item),
1220 )
1221 .unwrap_or_else(|e| {
1222 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1223 });
1224
1225 CatalogEntry {
1226 item,
1227 id,
1228 oid,
1229 name,
1230 owner_id,
1231 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1232 referenced_by: retraction.referenced_by,
1233 used_by: retraction.used_by,
1234 }
1235 }
1236 None => {
1237 let catalog_item = self
1238 .deserialize_item(
1239 global_id,
1240 &create_sql,
1241 &extra_versions,
1242 local_expression_cache,
1243 None,
1244 )
1245 .unwrap_or_else(|e| {
1246 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1247 });
1248 CatalogEntry {
1249 item: catalog_item,
1250 referenced_by: Vec::new(),
1251 used_by: Vec::new(),
1252 id,
1253 oid,
1254 name,
1255 owner_id,
1256 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1257 }
1258 }
1259 };
1260
1261 self.insert_entry(entry);
1262 }
1263 StateDiff::Retraction => {
1264 let entry = self.drop_item(item.id);
1265 let key = item.into_key_value().0;
1266 retractions.items.insert(key, entry);
1267 }
1268 }
1269 Ok(())
1270 }
1271
1272 #[instrument(level = "debug")]
1273 fn apply_comment_update(
1274 &mut self,
1275 comment: mz_catalog::durable::Comment,
1276 diff: StateDiff,
1277 _retractions: &mut InProgressRetractions,
1278 ) {
1279 match diff {
1280 StateDiff::Addition => {
1281 let prev = Arc::make_mut(&mut self.comments).update_comment(
1282 comment.object_id,
1283 comment.sub_component,
1284 Some(comment.comment),
1285 );
1286 assert_eq!(
1287 prev, None,
1288 "values must be explicitly retracted before inserting a new value"
1289 );
1290 }
1291 StateDiff::Retraction => {
1292 let prev = Arc::make_mut(&mut self.comments).update_comment(
1293 comment.object_id,
1294 comment.sub_component,
1295 None,
1296 );
1297 assert_eq!(
1298 prev,
1299 Some(comment.comment),
1300 "retraction does not match existing value: ({:?}, {:?})",
1301 comment.object_id,
1302 comment.sub_component,
1303 );
1304 }
1305 }
1306 }
1307
1308 #[instrument(level = "debug")]
1309 fn apply_source_references_update(
1310 &mut self,
1311 source_references: mz_catalog::durable::SourceReferences,
1312 diff: StateDiff,
1313 _retractions: &mut InProgressRetractions,
1314 ) {
1315 match diff {
1316 StateDiff::Addition => {
1317 let prev = self
1318 .source_references
1319 .insert(source_references.source_id, source_references.into());
1320 assert!(
1321 prev.is_none(),
1322 "values must be explicitly retracted before inserting a new value: {prev:?}"
1323 );
1324 }
1325 StateDiff::Retraction => {
1326 let prev = self.source_references.remove(&source_references.source_id);
1327 assert!(
1328 prev.is_some(),
1329 "retraction for a non-existent existing value: {source_references:?}"
1330 );
1331 }
1332 }
1333 }
1334
1335 #[instrument(level = "debug")]
1336 fn apply_storage_collection_metadata_update(
1337 &mut self,
1338 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1339 diff: StateDiff,
1340 _retractions: &mut InProgressRetractions,
1341 ) {
1342 apply_inverted_lookup(
1343 &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1344 &storage_collection_metadata.id,
1345 storage_collection_metadata.shard,
1346 diff,
1347 );
1348 }
1349
1350 #[instrument(level = "debug")]
1351 fn apply_unfinalized_shard_update(
1352 &mut self,
1353 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1354 diff: StateDiff,
1355 _retractions: &mut InProgressRetractions,
1356 ) {
1357 match diff {
1358 StateDiff::Addition => {
1359 let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1360 .unfinalized_shards
1361 .insert(unfinalized_shard.shard);
1362 assert!(
1363 newly_inserted,
1364 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1365 );
1366 }
1367 StateDiff::Retraction => {
1368 let removed = Arc::make_mut(&mut self.storage_metadata)
1369 .unfinalized_shards
1370 .remove(&unfinalized_shard.shard);
1371 assert!(
1372 removed,
1373 "retraction does not match existing value: {unfinalized_shard:?}"
1374 );
1375 }
1376 }
1377 }
1378
1379 #[instrument]
1382 pub(crate) fn generate_builtin_table_updates(
1383 &self,
1384 updates: Vec<StateUpdate>,
1385 ) -> Vec<BuiltinTableUpdate> {
1386 let mut builtin_table_updates = Vec::new();
1387 for StateUpdate { kind, ts: _, diff } in updates {
1388 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1389 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1390 builtin_table_updates.extend(builtin_table_update);
1391 }
1392 builtin_table_updates
1393 }
1394
1395 #[instrument(level = "debug")]
1398 pub(crate) fn generate_builtin_table_update(
1399 &self,
1400 kind: StateUpdateKind,
1401 diff: StateDiff,
1402 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1403 let diff = diff.into();
1404 match kind {
1405 StateUpdateKind::Role(role) => self.pack_role_update(role.id, diff),
1406 StateUpdateKind::RoleAuth(role_auth) => {
1407 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1408 }
1409 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1410 vec![self.pack_default_privileges_update(
1411 &default_privilege.object,
1412 &default_privilege.acl_item.grantee,
1413 &default_privilege.acl_item.acl_mode,
1414 diff,
1415 )]
1416 }
1417 StateUpdateKind::SystemPrivilege(system_privilege) => {
1418 vec![self.pack_system_privileges_update(system_privilege, diff)]
1419 }
1420 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1421 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1422 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1423 self.pack_item_update(introspection_source_index.item_id, diff)
1424 }
1425 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1426 cluster_replica.cluster_id,
1427 &cluster_replica.name,
1428 diff,
1429 ),
1430 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1431 if !system_object_mapping.unique_identifier.runtime_alterable() {
1435 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1436 } else {
1437 vec![]
1438 }
1439 }
1440 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1441 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1442 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1443 comment.object_id,
1444 comment.sub_component,
1445 &comment.comment,
1446 diff,
1447 )],
1448 StateUpdateKind::SourceReferences(source_references) => {
1449 self.pack_source_references_update(&source_references, diff)
1450 }
1451 StateUpdateKind::AuditLog(audit_log) => {
1452 vec![
1453 self.pack_audit_log_update(&audit_log.event, diff)
1454 .expect("could not pack audit log update"),
1455 ]
1456 }
1457 StateUpdateKind::Database(_)
1458 | StateUpdateKind::Schema(_)
1459 | StateUpdateKind::NetworkPolicy(_)
1460 | StateUpdateKind::StorageCollectionMetadata(_)
1461 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1462 }
1463 }
1464
1465 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1466 self.entry_by_id
1467 .get_mut(id)
1468 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1469 }
1470
1471 fn get_schema_mut(
1472 &mut self,
1473 database_spec: &ResolvedDatabaseSpecifier,
1474 schema_spec: &SchemaSpecifier,
1475 conn_id: &ConnectionId,
1476 ) -> &mut Schema {
1477 match (database_spec, schema_spec) {
1479 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1480 .temporary_schemas
1481 .get_mut(conn_id)
1482 .expect("catalog out of sync"),
1483 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1484 .ambient_schemas_by_id
1485 .get_mut(id)
1486 .expect("catalog out of sync"),
1487 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1488 .database_by_id
1489 .get_mut(database_id)
1490 .expect("catalog out of sync")
1491 .schemas_by_id
1492 .get_mut(schema_id)
1493 .expect("catalog out of sync"),
1494 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1495 unreachable!("temporary schemas are in the ambient database")
1496 }
1497 }
1498 }
1499
1500 #[instrument(name = "catalog::parse_views")]
1510 async fn parse_builtin_views(
1511 state: &mut CatalogState,
1512 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1513 retractions: &mut InProgressRetractions,
1514 local_expression_cache: &mut LocalExpressionCache,
1515 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1516 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1517 let (updates, additions): (Vec<_>, Vec<_>) =
1518 builtin_views
1519 .into_iter()
1520 .partition_map(|(view, item_id, gid)| {
1521 match retractions.system_object_mappings.remove(&item_id) {
1522 Some(entry) => Either::Left(entry),
1523 None => Either::Right((view, item_id, gid)),
1524 }
1525 });
1526
1527 for entry in updates {
1528 let item_id = entry.id();
1533 state.insert_entry(entry);
1534 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1535 }
1536
1537 let mut handles = Vec::new();
1538 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1539 BTreeMap::new();
1540 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1541 let mut awaiting_all = Vec::new();
1544 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1546 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1547
1548 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1550 .into_iter()
1551 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1552 .collect();
1553 let item_ids: Vec<_> = views.keys().copied().collect();
1554
1555 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1556 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1557 if handles.is_empty() && ready.is_empty() {
1558 ready.extend(awaiting_all.drain(..));
1560 }
1561
1562 if !ready.is_empty() {
1564 let spawn_state = Arc::new(state.clone());
1565 while let Some(id) = ready.pop_front() {
1566 let (view, global_id) = views.get(&id).expect("must exist");
1567 let global_id = *global_id;
1568 let create_sql = view.create_sql();
1569 let versions = BTreeMap::new();
1571
1572 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1573 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1574 let task_state = Arc::clone(&spawn_state);
1575 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1576 let handle = mz_ore::task::spawn_blocking(
1577 || "parse view",
1578 move || {
1579 span.in_scope(|| {
1580 let res = task_state.parse_item_inner(
1581 global_id,
1582 &create_sql,
1583 &versions,
1584 None,
1585 false,
1586 None,
1587 cached_expr,
1588 None,
1589 );
1590 (id, global_id, res)
1591 })
1592 },
1593 );
1594 handles.push(handle);
1595 }
1596 }
1597
1598 let (selected, _idx, remaining) = future::select_all(handles).await;
1600 handles = remaining;
1601 let (id, global_id, res) = selected;
1602 let mut insert_cached_expr = |cached_expr| {
1603 if let Some(cached_expr) = cached_expr {
1604 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1605 }
1606 };
1607 match res {
1608 Ok((item, uncached_expr)) => {
1609 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1610 local_expression_cache.insert_uncached_expression(
1611 global_id,
1612 uncached_expr,
1613 optimizer_features,
1614 );
1615 }
1616 let (view, _gid) = views.remove(&id).expect("must exist");
1618 let schema_id = state
1619 .ambient_schemas_by_name
1620 .get(view.schema)
1621 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1622 let qname = QualifiedItemName {
1623 qualifiers: ItemQualifiers {
1624 database_spec: ResolvedDatabaseSpecifier::Ambient,
1625 schema_spec: SchemaSpecifier::Id(*schema_id),
1626 },
1627 item: view.name.into(),
1628 };
1629 let mut acl_items = vec![rbac::owner_privilege(
1630 mz_sql::catalog::ObjectType::View,
1631 MZ_SYSTEM_ROLE_ID,
1632 )];
1633 acl_items.extend_from_slice(&view.access);
1634
1635 state.insert_item(
1636 id,
1637 view.oid,
1638 qname,
1639 item,
1640 MZ_SYSTEM_ROLE_ID,
1641 PrivilegeMap::from_mz_acl_items(acl_items),
1642 );
1643
1644 let mut resolved_dependent_items = Vec::new();
1646 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1647 resolved_dependent_items.extend(dependent_items);
1648 }
1649 let entry = state.get_entry(&id);
1650 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1651 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1652 resolved_dependent_items.extend(dependent_items);
1653 }
1654 ready.extend(resolved_dependent_items);
1655
1656 completed_ids.insert(id);
1657 completed_names.insert(full_name);
1658 }
1659 Err((
1661 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1662 cached_expr,
1663 )) => {
1664 insert_cached_expr(cached_expr);
1665 if completed_ids.contains(&missing_dep) {
1666 ready.push_back(id);
1667 } else {
1668 awaiting_id_dependencies
1669 .entry(missing_dep)
1670 .or_default()
1671 .push(id);
1672 }
1673 }
1674 Err((
1676 AdapterError::PlanError(plan::PlanError::Catalog(
1677 SqlCatalogError::UnknownItem(missing_dep),
1678 )),
1679 cached_expr,
1680 )) => {
1681 insert_cached_expr(cached_expr);
1682 match CatalogItemId::from_str(&missing_dep) {
1683 Ok(missing_dep) => {
1684 if completed_ids.contains(&missing_dep) {
1685 ready.push_back(id);
1686 } else {
1687 awaiting_id_dependencies
1688 .entry(missing_dep)
1689 .or_default()
1690 .push(id);
1691 }
1692 }
1693 Err(_) => {
1694 if completed_names.contains(&missing_dep) {
1695 ready.push_back(id);
1696 } else {
1697 awaiting_name_dependencies
1698 .entry(missing_dep)
1699 .or_default()
1700 .push(id);
1701 }
1702 }
1703 }
1704 }
1705 Err((
1706 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1707 cached_expr,
1708 )) => {
1709 insert_cached_expr(cached_expr);
1710 awaiting_all.push(id);
1711 }
1712 Err((e, _)) => {
1713 let (bad_view, _gid) = views.get(&id).expect("must exist");
1714 panic!(
1715 "internal error: failed to load bootstrap view:\n\
1716 {name}\n\
1717 error:\n\
1718 {e:?}\n\n\
1719 Make sure that the schema name is specified in the builtin view's create sql statement.
1720 ",
1721 name = bad_view.name,
1722 )
1723 }
1724 }
1725 }
1726
1727 assert!(awaiting_id_dependencies.is_empty());
1728 assert!(
1729 awaiting_name_dependencies.is_empty(),
1730 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1731 );
1732 assert!(awaiting_all.is_empty());
1733 assert!(views.is_empty());
1734
1735 builtin_table_updates.extend(
1737 item_ids
1738 .into_iter()
1739 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1740 );
1741
1742 builtin_table_updates
1743 }
1744
1745 fn insert_entry(&mut self, entry: CatalogEntry) {
1747 if !entry.id.is_system() {
1748 if let Some(cluster_id) = entry.item.cluster_id() {
1749 self.clusters_by_id
1750 .get_mut(&cluster_id)
1751 .expect("catalog out of sync")
1752 .bound_objects
1753 .insert(entry.id);
1754 };
1755 }
1756
1757 for u in entry.references().items() {
1758 match self.entry_by_id.get_mut(u) {
1759 Some(metadata) => metadata.referenced_by.push(entry.id()),
1760 None => panic!(
1761 "Catalog: missing dependent catalog item {} while installing {}",
1762 &u,
1763 self.resolve_full_name(entry.name(), entry.conn_id())
1764 ),
1765 }
1766 }
1767 for u in entry.uses() {
1768 if u == entry.id() {
1771 continue;
1772 }
1773 match self.entry_by_id.get_mut(&u) {
1774 Some(metadata) => metadata.used_by.push(entry.id()),
1775 None => panic!(
1776 "Catalog: missing dependent catalog item {} while installing {}",
1777 &u,
1778 self.resolve_full_name(entry.name(), entry.conn_id())
1779 ),
1780 }
1781 }
1782 for gid in entry.item.global_ids() {
1783 self.entry_by_global_id.insert(gid, entry.id());
1784 }
1785 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1786 if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1789 && !self.temporary_schemas.contains_key(conn_id)
1790 {
1791 self.create_temporary_schema(conn_id, entry.owner_id)
1792 .expect("failed to create temporary schema");
1793 }
1794 let schema = self.get_schema_mut(
1795 &entry.name().qualifiers.database_spec,
1796 &entry.name().qualifiers.schema_spec,
1797 conn_id,
1798 );
1799
1800 let prev_id = match entry.item() {
1801 CatalogItem::Func(_) => schema
1802 .functions
1803 .insert(entry.name().item.clone(), entry.id()),
1804 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1805 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1806 };
1807
1808 assert!(
1809 prev_id.is_none(),
1810 "builtin name collision on {:?}",
1811 entry.name().item.clone()
1812 );
1813
1814 self.entry_by_id.insert(entry.id(), entry.clone());
1815 }
1816
1817 fn insert_item(
1819 &mut self,
1820 id: CatalogItemId,
1821 oid: u32,
1822 name: QualifiedItemName,
1823 item: CatalogItem,
1824 owner_id: RoleId,
1825 privileges: PrivilegeMap,
1826 ) {
1827 let entry = CatalogEntry {
1828 item,
1829 name,
1830 id,
1831 oid,
1832 used_by: Vec::new(),
1833 referenced_by: Vec::new(),
1834 owner_id,
1835 privileges,
1836 };
1837
1838 self.insert_entry(entry);
1839 }
1840
1841 #[mz_ore::instrument(level = "trace")]
1842 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1843 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1844 for u in metadata.references().items() {
1845 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1846 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1847 }
1848 }
1849 for u in metadata.uses() {
1850 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1851 dep_metadata.used_by.retain(|u| *u != metadata.id())
1852 }
1853 }
1854 for gid in metadata.global_ids() {
1855 self.entry_by_global_id.remove(&gid);
1856 }
1857
1858 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1859 let schema = self.get_schema_mut(
1860 &metadata.name().qualifiers.database_spec,
1861 &metadata.name().qualifiers.schema_spec,
1862 conn_id,
1863 );
1864 if metadata.item_type() == CatalogItemType::Type {
1865 schema
1866 .types
1867 .remove(&metadata.name().item)
1868 .expect("catalog out of sync");
1869 } else {
1870 assert_ne!(metadata.item_type(), CatalogItemType::Func);
1873
1874 schema
1875 .items
1876 .remove(&metadata.name().item)
1877 .expect("catalog out of sync");
1878 };
1879
1880 if !id.is_system() {
1881 if let Some(cluster_id) = metadata.item().cluster_id() {
1882 assert!(
1883 self.clusters_by_id
1884 .get_mut(&cluster_id)
1885 .expect("catalog out of sync")
1886 .bound_objects
1887 .remove(&id),
1888 "catalog out of sync"
1889 );
1890 }
1891 }
1892
1893 metadata
1894 }
1895
1896 fn insert_introspection_source_index(
1897 &mut self,
1898 cluster_id: ClusterId,
1899 log: &'static BuiltinLog,
1900 item_id: CatalogItemId,
1901 global_id: GlobalId,
1902 oid: u32,
1903 ) {
1904 let (index_name, index) =
1905 self.create_introspection_source_index(cluster_id, log, global_id);
1906 self.insert_item(
1907 item_id,
1908 oid,
1909 index_name,
1910 index,
1911 MZ_SYSTEM_ROLE_ID,
1912 PrivilegeMap::default(),
1913 );
1914 }
1915
1916 fn create_introspection_source_index(
1917 &self,
1918 cluster_id: ClusterId,
1919 log: &'static BuiltinLog,
1920 global_id: GlobalId,
1921 ) -> (QualifiedItemName, CatalogItem) {
1922 let source_name = FullItemName {
1923 database: RawDatabaseSpecifier::Ambient,
1924 schema: log.schema.into(),
1925 item: log.name.into(),
1926 };
1927 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1928 let mut index_name = QualifiedItemName {
1929 qualifiers: ItemQualifiers {
1930 database_spec: ResolvedDatabaseSpecifier::Ambient,
1931 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1932 },
1933 item: index_name.clone(),
1934 };
1935 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1936 let index_item_name = index_name.item.clone();
1937 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1938 let index = CatalogItem::Index(Index {
1939 global_id,
1940 on: log_global_id,
1941 keys: log
1942 .variant
1943 .index_by()
1944 .into_iter()
1945 .map(MirScalarExpr::column)
1946 .collect(),
1947 create_sql: index_sql(
1948 index_item_name,
1949 cluster_id,
1950 source_name,
1951 &log.variant.desc(),
1952 &log.variant.index_by(),
1953 ),
1954 conn_id: None,
1955 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1956 cluster_id,
1957 is_retained_metrics_object: false,
1958 custom_logical_compaction_window: None,
1959 });
1960 (index_name, index)
1961 }
1962
1963 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1968 Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
1969 }
1970
1971 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1976 Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
1977 }
1978}
1979
1980fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1988 fn push_update<T>(
1989 update: T,
1990 diff: StateDiff,
1991 retractions: &mut Vec<T>,
1992 additions: &mut Vec<T>,
1993 ) {
1994 match diff {
1995 StateDiff::Retraction => retractions.push(update),
1996 StateDiff::Addition => additions.push(update),
1997 }
1998 }
1999
2000 soft_assert_no_log!(
2001 updates.iter().map(|update| update.ts).all_equal(),
2002 "all timestamps should be equal: {updates:?}"
2003 );
2004 soft_assert_no_log!(
2005 {
2006 let mut dedup = BTreeSet::new();
2007 updates.iter().all(|update| dedup.insert(&update.kind))
2008 },
2009 "updates should be consolidated: {updates:?}"
2010 );
2011
2012 let mut pre_cluster_retractions = Vec::new();
2014 let mut pre_cluster_additions = Vec::new();
2015 let mut cluster_retractions = Vec::new();
2016 let mut cluster_additions = Vec::new();
2017 let mut builtin_item_updates = Vec::new();
2018 let mut item_retractions = Vec::new();
2019 let mut item_additions = Vec::new();
2020 let mut temp_item_retractions = Vec::new();
2021 let mut temp_item_additions = Vec::new();
2022 let mut post_item_retractions = Vec::new();
2023 let mut post_item_additions = Vec::new();
2024 for update in updates {
2025 let diff = update.diff.clone();
2026 match update.kind {
2027 StateUpdateKind::Role(_)
2028 | StateUpdateKind::RoleAuth(_)
2029 | StateUpdateKind::Database(_)
2030 | StateUpdateKind::Schema(_)
2031 | StateUpdateKind::DefaultPrivilege(_)
2032 | StateUpdateKind::SystemPrivilege(_)
2033 | StateUpdateKind::SystemConfiguration(_)
2034 | StateUpdateKind::NetworkPolicy(_) => push_update(
2035 update,
2036 diff,
2037 &mut pre_cluster_retractions,
2038 &mut pre_cluster_additions,
2039 ),
2040 StateUpdateKind::Cluster(_)
2041 | StateUpdateKind::IntrospectionSourceIndex(_)
2042 | StateUpdateKind::ClusterReplica(_) => push_update(
2043 update,
2044 diff,
2045 &mut cluster_retractions,
2046 &mut cluster_additions,
2047 ),
2048 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2049 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2050 }
2051 StateUpdateKind::TemporaryItem(item) => push_update(
2052 (item, update.ts, update.diff),
2053 diff,
2054 &mut temp_item_retractions,
2055 &mut temp_item_additions,
2056 ),
2057 StateUpdateKind::Item(item) => push_update(
2058 (item, update.ts, update.diff),
2059 diff,
2060 &mut item_retractions,
2061 &mut item_additions,
2062 ),
2063 StateUpdateKind::Comment(_)
2064 | StateUpdateKind::SourceReferences(_)
2065 | StateUpdateKind::AuditLog(_)
2066 | StateUpdateKind::StorageCollectionMetadata(_)
2067 | StateUpdateKind::UnfinalizedShard(_) => push_update(
2068 update,
2069 diff,
2070 &mut post_item_retractions,
2071 &mut post_item_additions,
2072 ),
2073 }
2074 }
2075
2076 let builtin_item_updates = builtin_item_updates
2079 .into_iter()
2080 .map(|(system_object_mapping, ts, diff)| {
2081 let idx = BUILTIN_LOOKUP
2082 .get(&system_object_mapping.description)
2083 .expect("missing builtin")
2084 .0;
2085 (idx, system_object_mapping, ts, diff)
2086 })
2087 .sorted_by_key(|(idx, _, _, _)| *idx)
2088 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2089
2090 let mut builtin_source_retractions = Vec::new();
2094 let mut builtin_source_additions = Vec::new();
2095 let mut other_builtin_retractions = Vec::new();
2096 let mut other_builtin_additions = Vec::new();
2097 for (builtin_item_update, ts, diff) in builtin_item_updates {
2098 let object_type = builtin_item_update.description.object_type;
2099 let update = StateUpdate {
2100 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2101 ts,
2102 diff,
2103 };
2104 if object_type == CatalogItemType::Source {
2105 push_update(
2106 update,
2107 diff,
2108 &mut builtin_source_retractions,
2109 &mut builtin_source_additions,
2110 );
2111 } else {
2112 push_update(
2113 update,
2114 diff,
2115 &mut other_builtin_retractions,
2116 &mut other_builtin_additions,
2117 );
2118 }
2119 }
2120
2121 fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2127 tracing::debug!(?items, "sorting items by dependencies");
2128
2129 let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2130 let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2131 let statement = mz_sql::parse::parse(&item.0.create_sql)
2132 .expect("valid create_sql")
2133 .into_element()
2134 .ast;
2135 mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2136 };
2137 sort_topological(items, key_fn, dependencies_fn);
2138 }
2139
2140 fn sort_item_updates(
2156 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2157 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2158 let mut types = Vec::new();
2161 let mut funcs = Vec::new();
2164 let mut secrets = Vec::new();
2165 let mut connections = Vec::new();
2166 let mut sources = Vec::new();
2167 let mut tables = Vec::new();
2168 let mut derived_items = Vec::new();
2169 let mut sinks = Vec::new();
2170 let mut continual_tasks = Vec::new();
2171
2172 for update in item_updates {
2173 match update.0.item_type() {
2174 CatalogItemType::Type => types.push(update),
2175 CatalogItemType::Func => funcs.push(update),
2176 CatalogItemType::Secret => secrets.push(update),
2177 CatalogItemType::Connection => connections.push(update),
2178 CatalogItemType::Source => sources.push(update),
2179 CatalogItemType::Table => tables.push(update),
2180 CatalogItemType::View
2181 | CatalogItemType::MaterializedView
2182 | CatalogItemType::Index => derived_items.push(update),
2183 CatalogItemType::Sink => sinks.push(update),
2184 CatalogItemType::ContinualTask => continual_tasks.push(update),
2185 }
2186 }
2187
2188 sort_items_topological(&mut connections);
2192 sort_items_topological(&mut derived_items);
2193
2194 for group in [
2196 &mut types,
2197 &mut funcs,
2198 &mut secrets,
2199 &mut sources,
2200 &mut tables,
2201 &mut sinks,
2202 &mut continual_tasks,
2203 ] {
2204 group.sort_by_key(|(item, _, _)| item.id);
2205 }
2206
2207 iter::empty()
2208 .chain(types)
2209 .chain(funcs)
2210 .chain(secrets)
2211 .chain(connections)
2212 .chain(sources)
2213 .chain(tables)
2214 .chain(derived_items)
2215 .chain(sinks)
2216 .chain(continual_tasks)
2217 .collect()
2218 }
2219
2220 let item_retractions = sort_item_updates(item_retractions);
2221 let item_additions = sort_item_updates(item_additions);
2222
2223 fn sort_temp_item_updates(
2227 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2228 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2229 let mut types = Vec::new();
2232 let mut funcs = Vec::new();
2234 let mut secrets = Vec::new();
2235 let mut connections = Vec::new();
2236 let mut sources = Vec::new();
2237 let mut tables = Vec::new();
2238 let mut derived_items = Vec::new();
2239 let mut sinks = Vec::new();
2240 let mut continual_tasks = Vec::new();
2241
2242 for update in temp_item_updates {
2243 match update.0.item_type() {
2244 CatalogItemType::Type => types.push(update),
2245 CatalogItemType::Func => funcs.push(update),
2246 CatalogItemType::Secret => secrets.push(update),
2247 CatalogItemType::Connection => connections.push(update),
2248 CatalogItemType::Source => sources.push(update),
2249 CatalogItemType::Table => tables.push(update),
2250 CatalogItemType::View
2251 | CatalogItemType::MaterializedView
2252 | CatalogItemType::Index => derived_items.push(update),
2253 CatalogItemType::Sink => sinks.push(update),
2254 CatalogItemType::ContinualTask => continual_tasks.push(update),
2255 }
2256 }
2257
2258 for group in [
2260 &mut types,
2261 &mut funcs,
2262 &mut secrets,
2263 &mut connections,
2264 &mut sources,
2265 &mut tables,
2266 &mut derived_items,
2267 &mut sinks,
2268 &mut continual_tasks,
2269 ] {
2270 group.sort_by_key(|(item, _, _)| item.id);
2271 }
2272
2273 iter::empty()
2274 .chain(types)
2275 .chain(funcs)
2276 .chain(secrets)
2277 .chain(connections)
2278 .chain(sources)
2279 .chain(tables)
2280 .chain(derived_items)
2281 .chain(sinks)
2282 .chain(continual_tasks)
2283 .collect()
2284 }
2285 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2286 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2287
2288 fn merge_item_updates(
2290 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2291 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2292 ) -> Vec<StateUpdate> {
2293 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2294
2295 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2296 (item_updates.front(), temp_item_updates.front())
2297 {
2298 if item.id < temp_item.id {
2299 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2300 state_updates.push(StateUpdate {
2301 kind: StateUpdateKind::Item(item),
2302 ts,
2303 diff,
2304 });
2305 } else if item.id > temp_item.id {
2306 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2307 state_updates.push(StateUpdate {
2308 kind: StateUpdateKind::TemporaryItem(temp_item),
2309 ts,
2310 diff,
2311 });
2312 } else {
2313 unreachable!(
2314 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2315 );
2316 }
2317 }
2318
2319 while let Some((item, ts, diff)) = item_updates.pop_front() {
2320 state_updates.push(StateUpdate {
2321 kind: StateUpdateKind::Item(item),
2322 ts,
2323 diff,
2324 });
2325 }
2326
2327 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2328 state_updates.push(StateUpdate {
2329 kind: StateUpdateKind::TemporaryItem(temp_item),
2330 ts,
2331 diff,
2332 });
2333 }
2334
2335 state_updates
2336 }
2337 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2338 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2339
2340 iter::empty()
2342 .chain(post_item_retractions.into_iter().rev())
2344 .chain(item_retractions.into_iter().rev())
2345 .chain(other_builtin_retractions.into_iter().rev())
2346 .chain(cluster_retractions.into_iter().rev())
2347 .chain(builtin_source_retractions.into_iter().rev())
2348 .chain(pre_cluster_retractions.into_iter().rev())
2349 .chain(pre_cluster_additions)
2350 .chain(builtin_source_additions)
2351 .chain(cluster_additions)
2352 .chain(other_builtin_additions)
2353 .chain(item_additions)
2354 .chain(post_item_additions)
2355 .collect()
2356}
2357
2358enum ApplyState {
2363 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2365 Items(Vec<StateUpdate>),
2371 Updates(Vec<StateUpdate>),
2373}
2374
2375impl ApplyState {
2376 fn new(update: StateUpdate) -> Self {
2377 use StateUpdateKind::*;
2378 match &update.kind {
2379 SystemObjectMapping(som)
2380 if som.description.object_type == CatalogItemType::View
2381 && update.diff == StateDiff::Addition =>
2382 {
2383 let view_addition = lookup_builtin_view_addition(som.clone());
2384 Self::BuiltinViewAdditions(vec![view_addition])
2385 }
2386
2387 IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2388 Self::Items(vec![update])
2389 }
2390
2391 Role(_)
2392 | RoleAuth(_)
2393 | Database(_)
2394 | Schema(_)
2395 | DefaultPrivilege(_)
2396 | SystemPrivilege(_)
2397 | SystemConfiguration(_)
2398 | Cluster(_)
2399 | NetworkPolicy(_)
2400 | ClusterReplica(_)
2401 | SourceReferences(_)
2402 | Comment(_)
2403 | AuditLog(_)
2404 | StorageCollectionMetadata(_)
2405 | UnfinalizedShard(_) => Self::Updates(vec![update]),
2406 }
2407 }
2408
2409 async fn apply(
2415 self,
2416 state: &mut CatalogState,
2417 retractions: &mut InProgressRetractions,
2418 local_expression_cache: &mut LocalExpressionCache,
2419 ) -> (
2420 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2421 Vec<ParsedStateUpdate>,
2422 ) {
2423 match self {
2424 Self::BuiltinViewAdditions(builtin_view_additions) => {
2425 let restore = Arc::clone(&state.system_configuration);
2426 Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2427 let builtin_table_updates = CatalogState::parse_builtin_views(
2428 state,
2429 builtin_view_additions,
2430 retractions,
2431 local_expression_cache,
2432 )
2433 .await;
2434 state.system_configuration = restore;
2435 (builtin_table_updates, Vec::new())
2436 }
2437 Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2438 state
2439 .apply_updates_inner(updates, retractions, local_expression_cache)
2440 .expect("corrupt catalog")
2441 }),
2442 Self::Updates(updates) => state
2443 .apply_updates_inner(updates, retractions, local_expression_cache)
2444 .expect("corrupt catalog"),
2445 }
2446 }
2447
2448 async fn step(
2449 self,
2450 next: Self,
2451 state: &mut CatalogState,
2452 retractions: &mut InProgressRetractions,
2453 local_expression_cache: &mut LocalExpressionCache,
2454 ) -> (
2455 Self,
2456 (
2457 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2458 Vec<ParsedStateUpdate>,
2459 ),
2460 ) {
2461 match (self, next) {
2462 (
2463 Self::BuiltinViewAdditions(mut builtin_view_additions),
2464 Self::BuiltinViewAdditions(next_builtin_view_additions),
2465 ) => {
2466 builtin_view_additions.extend(next_builtin_view_additions);
2468 (
2469 Self::BuiltinViewAdditions(builtin_view_additions),
2470 (Vec::new(), Vec::new()),
2471 )
2472 }
2473 (Self::Items(mut updates), Self::Items(next_updates)) => {
2474 updates.extend(next_updates);
2476 (Self::Items(updates), (Vec::new(), Vec::new()))
2477 }
2478 (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2479 updates.extend(next_updates);
2481 (Self::Updates(updates), (Vec::new(), Vec::new()))
2482 }
2483 (apply_state, next_apply_state) => {
2484 let updates = apply_state
2486 .apply(state, retractions, local_expression_cache)
2487 .await;
2488 (next_apply_state, updates)
2489 }
2490 }
2491 }
2492}
2493
2494trait MutableMap<K, V> {
2497 fn insert(&mut self, key: K, value: V) -> Option<V>;
2498 fn remove(&mut self, key: &K) -> Option<V>;
2499}
2500
2501impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2502 fn insert(&mut self, key: K, value: V) -> Option<V> {
2503 BTreeMap::insert(self, key, value)
2504 }
2505 fn remove(&mut self, key: &K) -> Option<V> {
2506 BTreeMap::remove(self, key)
2507 }
2508}
2509
2510impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2511 fn insert(&mut self, key: K, value: V) -> Option<V> {
2512 imbl::OrdMap::insert(self, key, value)
2513 }
2514 fn remove(&mut self, key: &K) -> Option<V> {
2515 imbl::OrdMap::remove(self, key)
2516 }
2517}
2518
2519fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2524where
2525 K: Ord + Clone + Debug,
2526 V: PartialEq + Debug,
2527{
2528 match diff {
2529 StateDiff::Retraction => {
2530 let prev = map.remove(key);
2531 assert_eq!(
2532 prev,
2533 Some(value),
2534 "retraction does not match existing value: {key:?}"
2535 );
2536 }
2537 StateDiff::Addition => {
2538 let prev = map.insert(key.clone(), value);
2539 assert_eq!(
2540 prev, None,
2541 "values must be explicitly retracted before inserting a new value: {key:?}"
2542 );
2543 }
2544 }
2545}
2546
2547fn apply_with_update<K, V, D>(
2550 map: &mut impl MutableMap<K, V>,
2551 durable: D,
2552 key_fn: impl FnOnce(&D) -> K,
2553 diff: StateDiff,
2554 retractions: &mut BTreeMap<D::Key, V>,
2555) where
2556 K: Ord,
2557 V: UpdateFrom<D> + PartialEq + Debug,
2558 D: DurableType,
2559 D::Key: Ord,
2560{
2561 match diff {
2562 StateDiff::Retraction => {
2563 let mem_key = key_fn(&durable);
2564 let value = map
2565 .remove(&mem_key)
2566 .expect("retraction does not match existing value: {key:?}");
2567 let durable_key = durable.into_key_value().0;
2568 retractions.insert(durable_key, value);
2569 }
2570 StateDiff::Addition => {
2571 let mem_key = key_fn(&durable);
2572 let durable_key = durable.key();
2573 let value = match retractions.remove(&durable_key) {
2574 Some(mut retraction) => {
2575 retraction.update_from(durable);
2576 retraction
2577 }
2578 None => durable.into(),
2579 };
2580 let prev = map.insert(mem_key, value);
2581 assert_eq!(
2582 prev, None,
2583 "values must be explicitly retracted before inserting a new value"
2584 );
2585 }
2586 }
2587}
2588
2589fn lookup_builtin_view_addition(
2591 mapping: SystemObjectMapping,
2592) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2593 let (_, builtin) = BUILTIN_LOOKUP
2594 .get(&mapping.description)
2595 .expect("missing builtin view");
2596 let Builtin::View(view) = builtin else {
2597 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2598 };
2599
2600 (
2601 view,
2602 mapping.unique_identifier.catalog_id,
2603 mapping.unique_identifier.global_id,
2604 )
2605}