1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29 SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35 Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36 TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::{index_sql, sort_topological};
67
68#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81 roles: BTreeMap<RoleKey, Role>,
82 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83 databases: BTreeMap<DatabaseKey, Database>,
84 schemas: BTreeMap<SchemaKey, Schema>,
85 clusters: BTreeMap<ClusterKey, Cluster>,
86 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87 items: BTreeMap<ItemKey, CatalogEntry>,
88 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94 #[must_use]
98 #[instrument]
99 pub(crate) async fn apply_updates(
100 &mut self,
101 updates: Vec<StateUpdate>,
102 local_expression_cache: &mut LocalExpressionCache,
103 ) -> (
104 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
105 Vec<ParsedStateUpdate>,
106 ) {
107 let mut builtin_table_updates = Vec::with_capacity(updates.len());
108 let mut catalog_updates = Vec::with_capacity(updates.len());
109
110 let updates = Self::consolidate_updates(updates);
115
116 let mut groups: Vec<Vec<_>> = Vec::new();
118 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
119 let updates = sort_updates(updates.collect());
123 groups.push(updates);
124 }
125
126 for updates in groups {
127 let mut apply_state = ApplyState::Updates(Vec::new());
128 let mut retractions = InProgressRetractions::default();
129
130 for update in updates {
131 let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
132 .step(
133 ApplyState::new(update),
134 self,
135 &mut retractions,
136 local_expression_cache,
137 )
138 .await;
139 apply_state = next_apply_state;
140 builtin_table_updates.extend(builtin_table_update);
141 catalog_updates.extend(catalog_update);
142 }
143
144 let (builtin_table_update, catalog_update) = apply_state
146 .apply(self, &mut retractions, local_expression_cache)
147 .await;
148 builtin_table_updates.extend(builtin_table_update);
149 catalog_updates.extend(catalog_update);
150 }
151
152 (builtin_table_updates, catalog_updates)
153 }
154
155 fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
163 let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
164 .into_iter()
165 .map(|update| (update.kind, update.ts, update.diff.into()))
166 .collect_vec();
167
168 consolidate_updates(&mut updates);
169
170 updates
171 .into_iter()
172 .map(|(kind, ts, diff)| StateUpdate {
173 kind,
174 ts,
175 diff: diff
176 .try_into()
177 .expect("catalog state cannot have diff other than -1 or 1"),
178 })
179 .collect_vec()
180 }
181
182 #[instrument(level = "debug")]
183 fn apply_updates_inner(
184 &mut self,
185 updates: Vec<StateUpdate>,
186 retractions: &mut InProgressRetractions,
187 local_expression_cache: &mut LocalExpressionCache,
188 ) -> Result<
189 (
190 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
191 Vec<ParsedStateUpdate>,
192 ),
193 CatalogError,
194 > {
195 soft_assert_no_log!(
196 updates.iter().map(|update| update.ts).all_equal(),
197 "all timestamps should be equal: {updates:?}"
198 );
199
200 let mut update_system_config = false;
201
202 let mut builtin_table_updates = Vec::with_capacity(updates.len());
203 let mut catalog_updates = Vec::new();
204
205 for state_update in updates {
206 if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
207 update_system_config = true;
208 }
209
210 match state_update.diff {
211 StateDiff::Retraction => {
212 if let Some(update) =
216 parsed_state_updates::parse_state_update(self, state_update.clone())
217 {
218 catalog_updates.push(update);
219 }
220
221 builtin_table_updates.extend(self.generate_builtin_table_update(
224 state_update.kind.clone(),
225 state_update.diff,
226 ));
227 self.apply_update(
228 state_update.kind,
229 state_update.diff,
230 retractions,
231 local_expression_cache,
232 )?;
233 }
234 StateDiff::Addition => {
235 self.apply_update(
236 state_update.kind.clone(),
237 state_update.diff,
238 retractions,
239 local_expression_cache,
240 )?;
241 builtin_table_updates.extend(self.generate_builtin_table_update(
245 state_update.kind.clone(),
246 state_update.diff,
247 ));
248
249 if let Some(update) =
252 parsed_state_updates::parse_state_update(self, state_update.clone())
253 {
254 catalog_updates.push(update);
255 }
256 }
257 }
258 }
259
260 if update_system_config {
261 self.system_configuration.dyncfg_updates();
262 }
263
264 Ok((builtin_table_updates, catalog_updates))
265 }
266
267 #[instrument(level = "debug")]
268 fn apply_update(
269 &mut self,
270 kind: StateUpdateKind,
271 diff: StateDiff,
272 retractions: &mut InProgressRetractions,
273 local_expression_cache: &mut LocalExpressionCache,
274 ) -> Result<(), CatalogError> {
275 match kind {
276 StateUpdateKind::Role(role) => {
277 self.apply_role_update(role, diff, retractions);
278 }
279 StateUpdateKind::RoleAuth(role_auth) => {
280 self.apply_role_auth_update(role_auth, diff, retractions);
281 }
282 StateUpdateKind::Database(database) => {
283 self.apply_database_update(database, diff, retractions);
284 }
285 StateUpdateKind::Schema(schema) => {
286 self.apply_schema_update(schema, diff, retractions);
287 }
288 StateUpdateKind::DefaultPrivilege(default_privilege) => {
289 self.apply_default_privilege_update(default_privilege, diff, retractions);
290 }
291 StateUpdateKind::SystemPrivilege(system_privilege) => {
292 self.apply_system_privilege_update(system_privilege, diff, retractions);
293 }
294 StateUpdateKind::SystemConfiguration(system_configuration) => {
295 self.apply_system_configuration_update(system_configuration, diff, retractions);
296 }
297 StateUpdateKind::Cluster(cluster) => {
298 self.apply_cluster_update(cluster, diff, retractions);
299 }
300 StateUpdateKind::NetworkPolicy(network_policy) => {
301 self.apply_network_policy_update(network_policy, diff, retractions);
302 }
303 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304 self.apply_introspection_source_index_update(
305 introspection_source_index,
306 diff,
307 retractions,
308 );
309 }
310 StateUpdateKind::ClusterReplica(cluster_replica) => {
311 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
312 }
313 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
314 self.apply_system_object_mapping_update(
315 system_object_mapping,
316 diff,
317 retractions,
318 local_expression_cache,
319 );
320 }
321 StateUpdateKind::TemporaryItem(item) => {
322 self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
323 }
324 StateUpdateKind::Item(item) => {
325 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
326 }
327 StateUpdateKind::Comment(comment) => {
328 self.apply_comment_update(comment, diff, retractions);
329 }
330 StateUpdateKind::SourceReferences(source_reference) => {
331 self.apply_source_references_update(source_reference, diff, retractions);
332 }
333 StateUpdateKind::AuditLog(_audit_log) => {
334 }
336 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
337 self.apply_storage_collection_metadata_update(
338 storage_collection_metadata,
339 diff,
340 retractions,
341 );
342 }
343 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
344 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
345 }
346 }
347
348 Ok(())
349 }
350
351 #[instrument(level = "debug")]
352 fn apply_role_auth_update(
353 &mut self,
354 role_auth: mz_catalog::durable::RoleAuth,
355 diff: StateDiff,
356 retractions: &mut InProgressRetractions,
357 ) {
358 apply_with_update(
359 &mut self.role_auth_by_id,
360 role_auth,
361 |role_auth| role_auth.role_id,
362 diff,
363 &mut retractions.role_auths,
364 );
365 }
366
367 #[instrument(level = "debug")]
368 fn apply_role_update(
369 &mut self,
370 role: mz_catalog::durable::Role,
371 diff: StateDiff,
372 retractions: &mut InProgressRetractions,
373 ) {
374 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
375 apply_with_update(
376 &mut self.roles_by_id,
377 role,
378 |role| role.id,
379 diff,
380 &mut retractions.roles,
381 );
382 }
383
384 #[instrument(level = "debug")]
385 fn apply_database_update(
386 &mut self,
387 database: mz_catalog::durable::Database,
388 diff: StateDiff,
389 retractions: &mut InProgressRetractions,
390 ) {
391 apply_inverted_lookup(
392 &mut self.database_by_name,
393 &database.name,
394 database.id,
395 diff,
396 );
397 apply_with_update(
398 &mut self.database_by_id,
399 database,
400 |database| database.id,
401 diff,
402 &mut retractions.databases,
403 );
404 }
405
406 #[instrument(level = "debug")]
407 fn apply_schema_update(
408 &mut self,
409 schema: mz_catalog::durable::Schema,
410 diff: StateDiff,
411 retractions: &mut InProgressRetractions,
412 ) {
413 match &schema.database_id {
414 Some(database_id) => {
415 let db = self
416 .database_by_id
417 .get_mut(database_id)
418 .expect("catalog out of sync");
419 apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
420 apply_with_update(
421 &mut db.schemas_by_id,
422 schema,
423 |schema| schema.id,
424 diff,
425 &mut retractions.schemas,
426 );
427 }
428 None => {
429 apply_inverted_lookup(
430 &mut self.ambient_schemas_by_name,
431 &schema.name,
432 schema.id,
433 diff,
434 );
435 apply_with_update(
436 &mut self.ambient_schemas_by_id,
437 schema,
438 |schema| schema.id,
439 diff,
440 &mut retractions.schemas,
441 );
442 }
443 }
444 }
445
446 #[instrument(level = "debug")]
447 fn apply_default_privilege_update(
448 &mut self,
449 default_privilege: mz_catalog::durable::DefaultPrivilege,
450 diff: StateDiff,
451 _retractions: &mut InProgressRetractions,
452 ) {
453 match diff {
454 StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
455 .grant(default_privilege.object, default_privilege.acl_item),
456 StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
457 .revoke(&default_privilege.object, &default_privilege.acl_item),
458 }
459 }
460
461 #[instrument(level = "debug")]
462 fn apply_system_privilege_update(
463 &mut self,
464 system_privilege: MzAclItem,
465 diff: StateDiff,
466 _retractions: &mut InProgressRetractions,
467 ) {
468 match diff {
469 StateDiff::Addition => {
470 Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
471 }
472 StateDiff::Retraction => {
473 Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
474 }
475 }
476 }
477
478 #[instrument(level = "debug")]
479 fn apply_system_configuration_update(
480 &mut self,
481 system_configuration: mz_catalog::durable::SystemConfiguration,
482 diff: StateDiff,
483 _retractions: &mut InProgressRetractions,
484 ) {
485 let res = match diff {
486 StateDiff::Addition => self.insert_system_configuration(
487 &system_configuration.name,
488 VarInput::Flat(&system_configuration.value),
489 ),
490 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
491 };
492 match res {
493 Ok(_) => (),
494 Err(Error {
498 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
499 }) => {
500 warn!(%name, "unknown system parameter from catalog storage");
501 }
502 Err(e) => panic!("unable to update system variable: {e:?}"),
503 }
504 }
505
506 #[instrument(level = "debug")]
507 fn apply_cluster_update(
508 &mut self,
509 cluster: mz_catalog::durable::Cluster,
510 diff: StateDiff,
511 retractions: &mut InProgressRetractions,
512 ) {
513 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
514 apply_with_update(
515 &mut self.clusters_by_id,
516 cluster,
517 |cluster| cluster.id,
518 diff,
519 &mut retractions.clusters,
520 );
521 }
522
523 #[instrument(level = "debug")]
524 fn apply_network_policy_update(
525 &mut self,
526 policy: mz_catalog::durable::NetworkPolicy,
527 diff: StateDiff,
528 retractions: &mut InProgressRetractions,
529 ) {
530 apply_inverted_lookup(
531 &mut self.network_policies_by_name,
532 &policy.name,
533 policy.id,
534 diff,
535 );
536 apply_with_update(
537 &mut self.network_policies_by_id,
538 policy,
539 |policy| policy.id,
540 diff,
541 &mut retractions.network_policies,
542 );
543 }
544
545 #[instrument(level = "debug")]
546 fn apply_introspection_source_index_update(
547 &mut self,
548 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
549 diff: StateDiff,
550 retractions: &mut InProgressRetractions,
551 ) {
552 let cluster = self
553 .clusters_by_id
554 .get_mut(&introspection_source_index.cluster_id)
555 .expect("catalog out of sync");
556 let log = BUILTIN_LOG_LOOKUP
557 .get(introspection_source_index.name.as_str())
558 .expect("missing log");
559 apply_inverted_lookup(
560 &mut cluster.log_indexes,
561 &log.variant,
562 introspection_source_index.index_id,
563 diff,
564 );
565
566 match diff {
567 StateDiff::Addition => {
568 if let Some(mut entry) = retractions
569 .introspection_source_indexes
570 .remove(&introspection_source_index.item_id)
571 {
572 let (index_name, index) = self.create_introspection_source_index(
575 introspection_source_index.cluster_id,
576 log,
577 introspection_source_index.index_id,
578 );
579 assert_eq!(entry.id, introspection_source_index.item_id);
580 assert_eq!(entry.oid, introspection_source_index.oid);
581 assert_eq!(entry.name, index_name);
582 entry.item = index;
583 self.insert_entry(entry);
584 } else {
585 self.insert_introspection_source_index(
586 introspection_source_index.cluster_id,
587 log,
588 introspection_source_index.item_id,
589 introspection_source_index.index_id,
590 introspection_source_index.oid,
591 );
592 }
593 }
594 StateDiff::Retraction => {
595 let entry = self.drop_item(introspection_source_index.item_id);
596 retractions
597 .introspection_source_indexes
598 .insert(entry.id, entry);
599 }
600 }
601 }
602
603 #[instrument(level = "debug")]
604 fn apply_cluster_replica_update(
605 &mut self,
606 cluster_replica: mz_catalog::durable::ClusterReplica,
607 diff: StateDiff,
608 _retractions: &mut InProgressRetractions,
609 ) {
610 let cluster = self
611 .clusters_by_id
612 .get(&cluster_replica.cluster_id)
613 .expect("catalog out of sync");
614 let azs = cluster.availability_zones();
615 let location = self
616 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
617 .expect("catalog in unexpected state");
618 let cluster = self
619 .clusters_by_id
620 .get_mut(&cluster_replica.cluster_id)
621 .expect("catalog out of sync");
622 apply_inverted_lookup(
623 &mut cluster.replica_id_by_name_,
624 &cluster_replica.name,
625 cluster_replica.replica_id,
626 diff,
627 );
628 match diff {
629 StateDiff::Retraction => {
630 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
631 assert!(
632 prev.is_some(),
633 "retraction does not match existing value: {:?}",
634 cluster_replica.replica_id
635 );
636 }
637 StateDiff::Addition => {
638 let logging = ReplicaLogging {
639 log_logging: cluster_replica.config.logging.log_logging,
640 interval: cluster_replica.config.logging.interval,
641 };
642 let config = ReplicaConfig {
643 location,
644 compute: ComputeReplicaConfig { logging },
645 };
646 let mem_cluster_replica = ClusterReplica {
647 name: cluster_replica.name.clone(),
648 cluster_id: cluster_replica.cluster_id,
649 replica_id: cluster_replica.replica_id,
650 config,
651 owner_id: cluster_replica.owner_id,
652 };
653 let prev = cluster
654 .replicas_by_id_
655 .insert(cluster_replica.replica_id, mem_cluster_replica);
656 assert_eq!(
657 prev, None,
658 "values must be explicitly retracted before inserting a new value: {:?}",
659 cluster_replica.replica_id
660 );
661 }
662 }
663 }
664
665 #[instrument(level = "debug")]
666 fn apply_system_object_mapping_update(
667 &mut self,
668 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
669 diff: StateDiff,
670 retractions: &mut InProgressRetractions,
671 local_expression_cache: &mut LocalExpressionCache,
672 ) {
673 let item_id = system_object_mapping.unique_identifier.catalog_id;
674 let global_id = system_object_mapping.unique_identifier.global_id;
675
676 if system_object_mapping.unique_identifier.runtime_alterable() {
677 return;
681 }
682
683 if let StateDiff::Retraction = diff {
684 let entry = self.drop_item(item_id);
685 retractions.system_object_mappings.insert(item_id, entry);
686 return;
687 }
688
689 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
690 self.insert_entry(entry);
695 return;
696 }
697
698 let builtin = BUILTIN_LOOKUP
699 .get(&system_object_mapping.description)
700 .expect("missing builtin")
701 .1;
702 let schema_name = builtin.schema();
703 let schema_id = self
704 .ambient_schemas_by_name
705 .get(schema_name)
706 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
707 let name = QualifiedItemName {
708 qualifiers: ItemQualifiers {
709 database_spec: ResolvedDatabaseSpecifier::Ambient,
710 schema_spec: SchemaSpecifier::Id(*schema_id),
711 },
712 item: builtin.name().into(),
713 };
714 match builtin {
715 Builtin::Log(log) => {
716 let mut acl_items = vec![rbac::owner_privilege(
717 mz_sql::catalog::ObjectType::Source,
718 MZ_SYSTEM_ROLE_ID,
719 )];
720 acl_items.extend_from_slice(&log.access);
721 self.insert_item(
722 item_id,
723 log.oid,
724 name.clone(),
725 CatalogItem::Log(Log {
726 variant: log.variant,
727 global_id,
728 }),
729 MZ_SYSTEM_ROLE_ID,
730 PrivilegeMap::from_mz_acl_items(acl_items),
731 );
732 }
733
734 Builtin::Table(table) => {
735 let mut acl_items = vec![rbac::owner_privilege(
736 mz_sql::catalog::ObjectType::Table,
737 MZ_SYSTEM_ROLE_ID,
738 )];
739 acl_items.extend_from_slice(&table.access);
740
741 self.insert_item(
742 item_id,
743 table.oid,
744 name.clone(),
745 CatalogItem::Table(Table {
746 create_sql: None,
747 desc: VersionedRelationDesc::new(table.desc.clone()),
748 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
749 conn_id: None,
750 resolved_ids: ResolvedIds::empty(),
751 custom_logical_compaction_window: table.is_retained_metrics_object.then(
752 || {
753 self.system_config()
754 .metrics_retention()
755 .try_into()
756 .expect("invalid metrics retention")
757 },
758 ),
759 is_retained_metrics_object: table.is_retained_metrics_object,
760 data_source: TableDataSource::TableWrites {
761 defaults: vec![Expr::null(); table.desc.arity()],
762 },
763 }),
764 MZ_SYSTEM_ROLE_ID,
765 PrivilegeMap::from_mz_acl_items(acl_items),
766 );
767 }
768 Builtin::Index(index) => {
769 let custom_logical_compaction_window =
770 index.is_retained_metrics_object.then(|| {
771 self.system_config()
772 .metrics_retention()
773 .try_into()
774 .expect("invalid metrics retention")
775 });
776 let versions = BTreeMap::new();
778
779 let item = self
780 .parse_item(
781 global_id,
782 &index.create_sql(),
783 &versions,
784 None,
785 index.is_retained_metrics_object,
786 custom_logical_compaction_window,
787 local_expression_cache,
788 None,
789 )
790 .unwrap_or_else(|e| {
791 panic!(
792 "internal error: failed to load bootstrap index:\n\
793 {}\n\
794 error:\n\
795 {:?}\n\n\
796 make sure that the schema name is specified in the builtin index's create sql statement.",
797 index.name, e
798 )
799 });
800 let CatalogItem::Index(_) = item else {
801 panic!(
802 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
803 index.name
804 );
805 };
806
807 self.insert_item(
808 item_id,
809 index.oid,
810 name,
811 item,
812 MZ_SYSTEM_ROLE_ID,
813 PrivilegeMap::default(),
814 );
815 }
816 Builtin::View(_) => {
817 unreachable!("views added elsewhere");
819 }
820
821 Builtin::Type(typ) => {
823 let typ = self.resolve_builtin_type_references(typ);
824 if let CatalogType::Array { element_reference } = typ.details.typ {
825 let entry = self.get_entry_mut(&element_reference);
826 let item_type = match &mut entry.item {
827 CatalogItem::Type(item_type) => item_type,
828 _ => unreachable!("types can only reference other types"),
829 };
830 item_type.details.array_id = Some(item_id);
831 }
832
833 let schema_id = self.resolve_system_schema(typ.schema);
834
835 self.insert_item(
836 item_id,
837 typ.oid,
838 QualifiedItemName {
839 qualifiers: ItemQualifiers {
840 database_spec: ResolvedDatabaseSpecifier::Ambient,
841 schema_spec: SchemaSpecifier::Id(schema_id),
842 },
843 item: typ.name.to_owned(),
844 },
845 CatalogItem::Type(Type {
846 create_sql: None,
847 global_id,
848 details: typ.details.clone(),
849 resolved_ids: ResolvedIds::empty(),
850 }),
851 MZ_SYSTEM_ROLE_ID,
852 PrivilegeMap::from_mz_acl_items(vec![
853 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
854 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
855 ]),
856 );
857 }
858
859 Builtin::Func(func) => {
860 let oid = INVALID_OID;
864 self.insert_item(
865 item_id,
866 oid,
867 name.clone(),
868 CatalogItem::Func(Func {
869 inner: func.inner,
870 global_id,
871 }),
872 MZ_SYSTEM_ROLE_ID,
873 PrivilegeMap::default(),
874 );
875 }
876
877 Builtin::Source(coll) => {
878 let mut acl_items = vec![rbac::owner_privilege(
879 mz_sql::catalog::ObjectType::Source,
880 MZ_SYSTEM_ROLE_ID,
881 )];
882 acl_items.extend_from_slice(&coll.access);
883
884 self.insert_item(
885 item_id,
886 coll.oid,
887 name.clone(),
888 CatalogItem::Source(Source {
889 create_sql: None,
890 data_source: coll.data_source.clone(),
891 desc: coll.desc.clone(),
892 global_id,
893 timeline: Timeline::EpochMilliseconds,
894 resolved_ids: ResolvedIds::empty(),
895 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
896 || {
897 self.system_config()
898 .metrics_retention()
899 .try_into()
900 .expect("invalid metrics retention")
901 },
902 ),
903 is_retained_metrics_object: coll.is_retained_metrics_object,
904 }),
905 MZ_SYSTEM_ROLE_ID,
906 PrivilegeMap::from_mz_acl_items(acl_items),
907 );
908 }
909 Builtin::ContinualTask(ct) => {
910 let mut acl_items = vec![rbac::owner_privilege(
911 mz_sql::catalog::ObjectType::Source,
912 MZ_SYSTEM_ROLE_ID,
913 )];
914 acl_items.extend_from_slice(&ct.access);
915 let versions = BTreeMap::new();
917
918 let item = self
919 .parse_item(
920 global_id,
921 &ct.create_sql(),
922 &versions,
923 None,
924 false,
925 None,
926 local_expression_cache,
927 None,
928 )
929 .unwrap_or_else(|e| {
930 panic!(
931 "internal error: failed to load bootstrap continual task:\n\
932 {}\n\
933 error:\n\
934 {:?}\n\n\
935 make sure that the schema name is specified in the builtin continual task's create sql statement.",
936 ct.name, e
937 )
938 });
939 let CatalogItem::ContinualTask(_) = &item else {
940 panic!(
941 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
942 ct.name
943 );
944 };
945
946 self.insert_item(
947 item_id,
948 ct.oid,
949 name,
950 item,
951 MZ_SYSTEM_ROLE_ID,
952 PrivilegeMap::from_mz_acl_items(acl_items),
953 );
954 }
955 Builtin::Connection(connection) => {
956 let versions = BTreeMap::new();
958 let mut item = self
959 .parse_item(
960 global_id,
961 connection.sql,
962 &versions,
963 None,
964 false,
965 None,
966 local_expression_cache,
967 None,
968 )
969 .unwrap_or_else(|e| {
970 panic!(
971 "internal error: failed to load bootstrap connection:\n\
972 {}\n\
973 error:\n\
974 {:?}\n\n\
975 make sure that the schema name is specified in the builtin connection's create sql statement.",
976 connection.name, e
977 )
978 });
979 let CatalogItem::Connection(_) = &mut item else {
980 panic!(
981 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
982 connection.name
983 );
984 };
985
986 let mut acl_items = vec![rbac::owner_privilege(
987 mz_sql::catalog::ObjectType::Connection,
988 connection.owner_id.clone(),
989 )];
990 acl_items.extend_from_slice(connection.access);
991
992 self.insert_item(
993 item_id,
994 connection.oid,
995 name.clone(),
996 item,
997 connection.owner_id.clone(),
998 PrivilegeMap::from_mz_acl_items(acl_items),
999 );
1000 }
1001 }
1002 }
1003
1004 #[instrument(level = "debug")]
1005 fn apply_temporary_item_update(
1006 &mut self,
1007 temporary_item: TemporaryItem,
1008 diff: StateDiff,
1009 retractions: &mut InProgressRetractions,
1010 local_expression_cache: &mut LocalExpressionCache,
1011 ) {
1012 match diff {
1013 StateDiff::Addition => {
1014 let TemporaryItem {
1015 id,
1016 oid,
1017 global_id,
1018 schema_id,
1019 name,
1020 conn_id,
1021 create_sql,
1022 owner_id,
1023 privileges,
1024 extra_versions,
1025 } = temporary_item;
1026 let temp_conn_id = conn_id
1029 .as_ref()
1030 .expect("temporary items must have a connection id");
1031 if !self.temporary_schemas.contains_key(temp_conn_id) {
1032 self.create_temporary_schema(temp_conn_id, owner_id)
1033 .expect("failed to create temporary schema");
1034 }
1035 let schema = self.find_temp_schema(&schema_id);
1036 let name = QualifiedItemName {
1037 qualifiers: ItemQualifiers {
1038 database_spec: schema.database().clone(),
1039 schema_spec: schema.id().clone(),
1040 },
1041 item: name.clone(),
1042 };
1043
1044 let entry = match retractions.temp_items.remove(&id) {
1045 Some(mut retraction) => {
1046 assert_eq!(retraction.id, id);
1047
1048 if retraction.create_sql() != create_sql {
1053 let mut catalog_item = self
1054 .deserialize_item(
1055 global_id,
1056 &create_sql,
1057 &extra_versions,
1058 local_expression_cache,
1059 Some(retraction.item),
1060 )
1061 .unwrap_or_else(|e| {
1062 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1063 });
1064 catalog_item.set_conn_id(conn_id);
1071 retraction.item = catalog_item;
1072 }
1073
1074 retraction.id = id;
1075 retraction.oid = oid;
1076 retraction.name = name;
1077 retraction.owner_id = owner_id;
1078 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1079 retraction
1080 }
1081 None => {
1082 let mut catalog_item = self
1083 .deserialize_item(
1084 global_id,
1085 &create_sql,
1086 &extra_versions,
1087 local_expression_cache,
1088 None,
1089 )
1090 .unwrap_or_else(|e| {
1091 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1092 });
1093
1094 catalog_item.set_conn_id(conn_id);
1100
1101 CatalogEntry {
1102 item: catalog_item,
1103 referenced_by: Vec::new(),
1104 used_by: Vec::new(),
1105 id,
1106 oid,
1107 name,
1108 owner_id,
1109 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1110 }
1111 }
1112 };
1113 self.insert_entry(entry);
1114 }
1115 StateDiff::Retraction => {
1116 let entry = self.drop_item(temporary_item.id);
1117 retractions.temp_items.insert(temporary_item.id, entry);
1118 }
1119 }
1120 }
1121
1122 #[instrument(level = "debug")]
1123 fn apply_item_update(
1124 &mut self,
1125 item: mz_catalog::durable::Item,
1126 diff: StateDiff,
1127 retractions: &mut InProgressRetractions,
1128 local_expression_cache: &mut LocalExpressionCache,
1129 ) -> Result<(), CatalogError> {
1130 match diff {
1131 StateDiff::Addition => {
1132 let key = item.key();
1133 let mz_catalog::durable::Item {
1134 id,
1135 oid,
1136 global_id,
1137 schema_id,
1138 name,
1139 create_sql,
1140 owner_id,
1141 privileges,
1142 extra_versions,
1143 } = item;
1144 let schema = self.find_non_temp_schema(&schema_id);
1145 let name = QualifiedItemName {
1146 qualifiers: ItemQualifiers {
1147 database_spec: schema.database().clone(),
1148 schema_spec: schema.id().clone(),
1149 },
1150 item: name.clone(),
1151 };
1152 let entry = match retractions.items.remove(&key) {
1153 Some(retraction) => {
1154 assert_eq!(retraction.id, item.id);
1155
1156 let item = self
1157 .deserialize_item(
1158 global_id,
1159 &create_sql,
1160 &extra_versions,
1161 local_expression_cache,
1162 Some(retraction.item),
1163 )
1164 .unwrap_or_else(|e| {
1165 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1166 });
1167
1168 CatalogEntry {
1169 item,
1170 id,
1171 oid,
1172 name,
1173 owner_id,
1174 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1175 referenced_by: retraction.referenced_by,
1176 used_by: retraction.used_by,
1177 }
1178 }
1179 None => {
1180 let catalog_item = self
1181 .deserialize_item(
1182 global_id,
1183 &create_sql,
1184 &extra_versions,
1185 local_expression_cache,
1186 None,
1187 )
1188 .unwrap_or_else(|e| {
1189 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1190 });
1191 CatalogEntry {
1192 item: catalog_item,
1193 referenced_by: Vec::new(),
1194 used_by: Vec::new(),
1195 id,
1196 oid,
1197 name,
1198 owner_id,
1199 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1200 }
1201 }
1202 };
1203
1204 self.insert_entry(entry);
1205 }
1206 StateDiff::Retraction => {
1207 let entry = self.drop_item(item.id);
1208 let key = item.into_key_value().0;
1209 retractions.items.insert(key, entry);
1210 }
1211 }
1212 Ok(())
1213 }
1214
1215 #[instrument(level = "debug")]
1216 fn apply_comment_update(
1217 &mut self,
1218 comment: mz_catalog::durable::Comment,
1219 diff: StateDiff,
1220 _retractions: &mut InProgressRetractions,
1221 ) {
1222 match diff {
1223 StateDiff::Addition => {
1224 let prev = Arc::make_mut(&mut self.comments).update_comment(
1225 comment.object_id,
1226 comment.sub_component,
1227 Some(comment.comment),
1228 );
1229 assert_eq!(
1230 prev, None,
1231 "values must be explicitly retracted before inserting a new value"
1232 );
1233 }
1234 StateDiff::Retraction => {
1235 let prev = Arc::make_mut(&mut self.comments).update_comment(
1236 comment.object_id,
1237 comment.sub_component,
1238 None,
1239 );
1240 assert_eq!(
1241 prev,
1242 Some(comment.comment),
1243 "retraction does not match existing value: ({:?}, {:?})",
1244 comment.object_id,
1245 comment.sub_component,
1246 );
1247 }
1248 }
1249 }
1250
1251 #[instrument(level = "debug")]
1252 fn apply_source_references_update(
1253 &mut self,
1254 source_references: mz_catalog::durable::SourceReferences,
1255 diff: StateDiff,
1256 _retractions: &mut InProgressRetractions,
1257 ) {
1258 match diff {
1259 StateDiff::Addition => {
1260 let prev = self
1261 .source_references
1262 .insert(source_references.source_id, source_references.into());
1263 assert!(
1264 prev.is_none(),
1265 "values must be explicitly retracted before inserting a new value: {prev:?}"
1266 );
1267 }
1268 StateDiff::Retraction => {
1269 let prev = self.source_references.remove(&source_references.source_id);
1270 assert!(
1271 prev.is_some(),
1272 "retraction for a non-existent existing value: {source_references:?}"
1273 );
1274 }
1275 }
1276 }
1277
1278 #[instrument(level = "debug")]
1279 fn apply_storage_collection_metadata_update(
1280 &mut self,
1281 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1282 diff: StateDiff,
1283 _retractions: &mut InProgressRetractions,
1284 ) {
1285 apply_inverted_lookup(
1286 &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1287 &storage_collection_metadata.id,
1288 storage_collection_metadata.shard,
1289 diff,
1290 );
1291 }
1292
1293 #[instrument(level = "debug")]
1294 fn apply_unfinalized_shard_update(
1295 &mut self,
1296 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1297 diff: StateDiff,
1298 _retractions: &mut InProgressRetractions,
1299 ) {
1300 match diff {
1301 StateDiff::Addition => {
1302 let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1303 .unfinalized_shards
1304 .insert(unfinalized_shard.shard);
1305 assert!(
1306 newly_inserted,
1307 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1308 );
1309 }
1310 StateDiff::Retraction => {
1311 let removed = Arc::make_mut(&mut self.storage_metadata)
1312 .unfinalized_shards
1313 .remove(&unfinalized_shard.shard);
1314 assert!(
1315 removed,
1316 "retraction does not match existing value: {unfinalized_shard:?}"
1317 );
1318 }
1319 }
1320 }
1321
1322 #[instrument]
1325 pub(crate) fn generate_builtin_table_updates(
1326 &self,
1327 updates: Vec<StateUpdate>,
1328 ) -> Vec<BuiltinTableUpdate> {
1329 let mut builtin_table_updates = Vec::new();
1330 for StateUpdate { kind, ts: _, diff } in updates {
1331 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1332 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1333 builtin_table_updates.extend(builtin_table_update);
1334 }
1335 builtin_table_updates
1336 }
1337
1338 #[instrument(level = "debug")]
1341 pub(crate) fn generate_builtin_table_update(
1342 &self,
1343 kind: StateUpdateKind,
1344 diff: StateDiff,
1345 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1346 let diff = diff.into();
1347 match kind {
1348 StateUpdateKind::Role(role) => {
1349 let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1350 for group_id in role.membership.map.keys() {
1351 builtin_table_updates
1352 .push(self.pack_role_members_update(*group_id, role.id, diff))
1353 }
1354 builtin_table_updates
1355 }
1356 StateUpdateKind::RoleAuth(role_auth) => {
1357 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1358 }
1359 StateUpdateKind::Database(database) => {
1360 vec![self.pack_database_update(&database.id, diff)]
1361 }
1362 StateUpdateKind::Schema(schema) => {
1363 let db_spec = schema.database_id.into();
1364 vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1365 }
1366 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1367 vec![self.pack_default_privileges_update(
1368 &default_privilege.object,
1369 &default_privilege.acl_item.grantee,
1370 &default_privilege.acl_item.acl_mode,
1371 diff,
1372 )]
1373 }
1374 StateUpdateKind::SystemPrivilege(system_privilege) => {
1375 vec![self.pack_system_privileges_update(system_privilege, diff)]
1376 }
1377 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1378 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1379 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1380 self.pack_item_update(introspection_source_index.item_id, diff)
1381 }
1382 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1383 cluster_replica.cluster_id,
1384 &cluster_replica.name,
1385 diff,
1386 ),
1387 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1388 if !system_object_mapping.unique_identifier.runtime_alterable() {
1392 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1393 } else {
1394 vec![]
1395 }
1396 }
1397 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1398 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1399 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1400 comment.object_id,
1401 comment.sub_component,
1402 &comment.comment,
1403 diff,
1404 )],
1405 StateUpdateKind::SourceReferences(source_references) => {
1406 self.pack_source_references_update(&source_references, diff)
1407 }
1408 StateUpdateKind::AuditLog(audit_log) => {
1409 vec![
1410 self.pack_audit_log_update(&audit_log.event, diff)
1411 .expect("could not pack audit log update"),
1412 ]
1413 }
1414 StateUpdateKind::NetworkPolicy(policy) => self
1415 .pack_network_policy_update(&policy.id, diff)
1416 .expect("could not pack audit log update"),
1417 StateUpdateKind::StorageCollectionMetadata(_)
1418 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1419 }
1420 }
1421
1422 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1423 self.entry_by_id
1424 .get_mut(id)
1425 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1426 }
1427
1428 fn get_schema_mut(
1429 &mut self,
1430 database_spec: &ResolvedDatabaseSpecifier,
1431 schema_spec: &SchemaSpecifier,
1432 conn_id: &ConnectionId,
1433 ) -> &mut Schema {
1434 match (database_spec, schema_spec) {
1436 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1437 .temporary_schemas
1438 .get_mut(conn_id)
1439 .expect("catalog out of sync"),
1440 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1441 .ambient_schemas_by_id
1442 .get_mut(id)
1443 .expect("catalog out of sync"),
1444 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1445 .database_by_id
1446 .get_mut(database_id)
1447 .expect("catalog out of sync")
1448 .schemas_by_id
1449 .get_mut(schema_id)
1450 .expect("catalog out of sync"),
1451 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1452 unreachable!("temporary schemas are in the ambient database")
1453 }
1454 }
1455 }
1456
1457 #[instrument(name = "catalog::parse_views")]
1467 async fn parse_builtin_views(
1468 state: &mut CatalogState,
1469 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1470 retractions: &mut InProgressRetractions,
1471 local_expression_cache: &mut LocalExpressionCache,
1472 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1473 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1474 let (updates, additions): (Vec<_>, Vec<_>) =
1475 builtin_views
1476 .into_iter()
1477 .partition_map(|(view, item_id, gid)| {
1478 match retractions.system_object_mappings.remove(&item_id) {
1479 Some(entry) => Either::Left(entry),
1480 None => Either::Right((view, item_id, gid)),
1481 }
1482 });
1483
1484 for entry in updates {
1485 let item_id = entry.id();
1490 state.insert_entry(entry);
1491 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1492 }
1493
1494 let mut handles = Vec::new();
1495 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1496 BTreeMap::new();
1497 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1498 let mut awaiting_all = Vec::new();
1501 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1503 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1504
1505 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1507 .into_iter()
1508 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1509 .collect();
1510 let item_ids: Vec<_> = views.keys().copied().collect();
1511
1512 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1513 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1514 if handles.is_empty() && ready.is_empty() {
1515 ready.extend(awaiting_all.drain(..));
1517 }
1518
1519 if !ready.is_empty() {
1521 let spawn_state = Arc::new(state.clone());
1522 while let Some(id) = ready.pop_front() {
1523 let (view, global_id) = views.get(&id).expect("must exist");
1524 let global_id = *global_id;
1525 let create_sql = view.create_sql();
1526 let versions = BTreeMap::new();
1528
1529 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1530 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1531 let task_state = Arc::clone(&spawn_state);
1532 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1533 let handle = mz_ore::task::spawn_blocking(
1534 || "parse view",
1535 move || {
1536 span.in_scope(|| {
1537 let res = task_state.parse_item_inner(
1538 global_id,
1539 &create_sql,
1540 &versions,
1541 None,
1542 false,
1543 None,
1544 cached_expr,
1545 None,
1546 );
1547 (id, global_id, res)
1548 })
1549 },
1550 );
1551 handles.push(handle);
1552 }
1553 }
1554
1555 let (selected, _idx, remaining) = future::select_all(handles).await;
1557 handles = remaining;
1558 let (id, global_id, res) = selected;
1559 let mut insert_cached_expr = |cached_expr| {
1560 if let Some(cached_expr) = cached_expr {
1561 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1562 }
1563 };
1564 match res {
1565 Ok((item, uncached_expr)) => {
1566 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1567 local_expression_cache.insert_uncached_expression(
1568 global_id,
1569 uncached_expr,
1570 optimizer_features,
1571 );
1572 }
1573 let (view, _gid) = views.remove(&id).expect("must exist");
1575 let schema_id = state
1576 .ambient_schemas_by_name
1577 .get(view.schema)
1578 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1579 let qname = QualifiedItemName {
1580 qualifiers: ItemQualifiers {
1581 database_spec: ResolvedDatabaseSpecifier::Ambient,
1582 schema_spec: SchemaSpecifier::Id(*schema_id),
1583 },
1584 item: view.name.into(),
1585 };
1586 let mut acl_items = vec![rbac::owner_privilege(
1587 mz_sql::catalog::ObjectType::View,
1588 MZ_SYSTEM_ROLE_ID,
1589 )];
1590 acl_items.extend_from_slice(&view.access);
1591
1592 state.insert_item(
1593 id,
1594 view.oid,
1595 qname,
1596 item,
1597 MZ_SYSTEM_ROLE_ID,
1598 PrivilegeMap::from_mz_acl_items(acl_items),
1599 );
1600
1601 let mut resolved_dependent_items = Vec::new();
1603 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1604 resolved_dependent_items.extend(dependent_items);
1605 }
1606 let entry = state.get_entry(&id);
1607 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1608 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1609 resolved_dependent_items.extend(dependent_items);
1610 }
1611 ready.extend(resolved_dependent_items);
1612
1613 completed_ids.insert(id);
1614 completed_names.insert(full_name);
1615 }
1616 Err((
1618 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1619 cached_expr,
1620 )) => {
1621 insert_cached_expr(cached_expr);
1622 if completed_ids.contains(&missing_dep) {
1623 ready.push_back(id);
1624 } else {
1625 awaiting_id_dependencies
1626 .entry(missing_dep)
1627 .or_default()
1628 .push(id);
1629 }
1630 }
1631 Err((
1633 AdapterError::PlanError(plan::PlanError::Catalog(
1634 SqlCatalogError::UnknownItem(missing_dep),
1635 )),
1636 cached_expr,
1637 )) => {
1638 insert_cached_expr(cached_expr);
1639 match CatalogItemId::from_str(&missing_dep) {
1640 Ok(missing_dep) => {
1641 if completed_ids.contains(&missing_dep) {
1642 ready.push_back(id);
1643 } else {
1644 awaiting_id_dependencies
1645 .entry(missing_dep)
1646 .or_default()
1647 .push(id);
1648 }
1649 }
1650 Err(_) => {
1651 if completed_names.contains(&missing_dep) {
1652 ready.push_back(id);
1653 } else {
1654 awaiting_name_dependencies
1655 .entry(missing_dep)
1656 .or_default()
1657 .push(id);
1658 }
1659 }
1660 }
1661 }
1662 Err((
1663 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1664 cached_expr,
1665 )) => {
1666 insert_cached_expr(cached_expr);
1667 awaiting_all.push(id);
1668 }
1669 Err((e, _)) => {
1670 let (bad_view, _gid) = views.get(&id).expect("must exist");
1671 panic!(
1672 "internal error: failed to load bootstrap view:\n\
1673 {name}\n\
1674 error:\n\
1675 {e:?}\n\n\
1676 Make sure that the schema name is specified in the builtin view's create sql statement.
1677 ",
1678 name = bad_view.name,
1679 )
1680 }
1681 }
1682 }
1683
1684 assert!(awaiting_id_dependencies.is_empty());
1685 assert!(
1686 awaiting_name_dependencies.is_empty(),
1687 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1688 );
1689 assert!(awaiting_all.is_empty());
1690 assert!(views.is_empty());
1691
1692 builtin_table_updates.extend(
1694 item_ids
1695 .into_iter()
1696 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1697 );
1698
1699 builtin_table_updates
1700 }
1701
1702 fn insert_entry(&mut self, entry: CatalogEntry) {
1704 if !entry.id.is_system() {
1705 if let Some(cluster_id) = entry.item.cluster_id() {
1706 self.clusters_by_id
1707 .get_mut(&cluster_id)
1708 .expect("catalog out of sync")
1709 .bound_objects
1710 .insert(entry.id);
1711 };
1712 }
1713
1714 for u in entry.references().items() {
1715 match self.entry_by_id.get_mut(u) {
1716 Some(metadata) => metadata.referenced_by.push(entry.id()),
1717 None => panic!(
1718 "Catalog: missing dependent catalog item {} while installing {}",
1719 &u,
1720 self.resolve_full_name(entry.name(), entry.conn_id())
1721 ),
1722 }
1723 }
1724 for u in entry.uses() {
1725 if u == entry.id() {
1728 continue;
1729 }
1730 match self.entry_by_id.get_mut(&u) {
1731 Some(metadata) => metadata.used_by.push(entry.id()),
1732 None => panic!(
1733 "Catalog: missing dependent catalog item {} while installing {}",
1734 &u,
1735 self.resolve_full_name(entry.name(), entry.conn_id())
1736 ),
1737 }
1738 }
1739 for gid in entry.item.global_ids() {
1740 self.entry_by_global_id.insert(gid, entry.id());
1741 }
1742 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1743 if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1746 && !self.temporary_schemas.contains_key(conn_id)
1747 {
1748 self.create_temporary_schema(conn_id, entry.owner_id)
1749 .expect("failed to create temporary schema");
1750 }
1751 let schema = self.get_schema_mut(
1752 &entry.name().qualifiers.database_spec,
1753 &entry.name().qualifiers.schema_spec,
1754 conn_id,
1755 );
1756
1757 let prev_id = match entry.item() {
1758 CatalogItem::Func(_) => schema
1759 .functions
1760 .insert(entry.name().item.clone(), entry.id()),
1761 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1762 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1763 };
1764
1765 assert!(
1766 prev_id.is_none(),
1767 "builtin name collision on {:?}",
1768 entry.name().item.clone()
1769 );
1770
1771 self.entry_by_id.insert(entry.id(), entry.clone());
1772 }
1773
1774 fn insert_item(
1776 &mut self,
1777 id: CatalogItemId,
1778 oid: u32,
1779 name: QualifiedItemName,
1780 item: CatalogItem,
1781 owner_id: RoleId,
1782 privileges: PrivilegeMap,
1783 ) {
1784 let entry = CatalogEntry {
1785 item,
1786 name,
1787 id,
1788 oid,
1789 used_by: Vec::new(),
1790 referenced_by: Vec::new(),
1791 owner_id,
1792 privileges,
1793 };
1794
1795 self.insert_entry(entry);
1796 }
1797
1798 #[mz_ore::instrument(level = "trace")]
1799 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1800 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1801 for u in metadata.references().items() {
1802 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1803 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1804 }
1805 }
1806 for u in metadata.uses() {
1807 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1808 dep_metadata.used_by.retain(|u| *u != metadata.id())
1809 }
1810 }
1811 for gid in metadata.global_ids() {
1812 self.entry_by_global_id.remove(&gid);
1813 }
1814
1815 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1816 let schema = self.get_schema_mut(
1817 &metadata.name().qualifiers.database_spec,
1818 &metadata.name().qualifiers.schema_spec,
1819 conn_id,
1820 );
1821 if metadata.item_type() == CatalogItemType::Type {
1822 schema
1823 .types
1824 .remove(&metadata.name().item)
1825 .expect("catalog out of sync");
1826 } else {
1827 assert_ne!(metadata.item_type(), CatalogItemType::Func);
1830
1831 schema
1832 .items
1833 .remove(&metadata.name().item)
1834 .expect("catalog out of sync");
1835 };
1836
1837 if !id.is_system() {
1838 if let Some(cluster_id) = metadata.item().cluster_id() {
1839 assert!(
1840 self.clusters_by_id
1841 .get_mut(&cluster_id)
1842 .expect("catalog out of sync")
1843 .bound_objects
1844 .remove(&id),
1845 "catalog out of sync"
1846 );
1847 }
1848 }
1849
1850 metadata
1851 }
1852
1853 fn insert_introspection_source_index(
1854 &mut self,
1855 cluster_id: ClusterId,
1856 log: &'static BuiltinLog,
1857 item_id: CatalogItemId,
1858 global_id: GlobalId,
1859 oid: u32,
1860 ) {
1861 let (index_name, index) =
1862 self.create_introspection_source_index(cluster_id, log, global_id);
1863 self.insert_item(
1864 item_id,
1865 oid,
1866 index_name,
1867 index,
1868 MZ_SYSTEM_ROLE_ID,
1869 PrivilegeMap::default(),
1870 );
1871 }
1872
1873 fn create_introspection_source_index(
1874 &self,
1875 cluster_id: ClusterId,
1876 log: &'static BuiltinLog,
1877 global_id: GlobalId,
1878 ) -> (QualifiedItemName, CatalogItem) {
1879 let source_name = FullItemName {
1880 database: RawDatabaseSpecifier::Ambient,
1881 schema: log.schema.into(),
1882 item: log.name.into(),
1883 };
1884 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1885 let mut index_name = QualifiedItemName {
1886 qualifiers: ItemQualifiers {
1887 database_spec: ResolvedDatabaseSpecifier::Ambient,
1888 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1889 },
1890 item: index_name.clone(),
1891 };
1892 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1893 let index_item_name = index_name.item.clone();
1894 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1895 let index = CatalogItem::Index(Index {
1896 global_id,
1897 on: log_global_id,
1898 keys: log
1899 .variant
1900 .index_by()
1901 .into_iter()
1902 .map(MirScalarExpr::column)
1903 .collect(),
1904 create_sql: index_sql(
1905 index_item_name,
1906 cluster_id,
1907 source_name,
1908 &log.variant.desc(),
1909 &log.variant.index_by(),
1910 ),
1911 conn_id: None,
1912 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1913 cluster_id,
1914 is_retained_metrics_object: false,
1915 custom_logical_compaction_window: None,
1916 });
1917 (index_name, index)
1918 }
1919
1920 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1925 Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
1926 }
1927
1928 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1933 Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
1934 }
1935}
1936
1937fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1945 fn push_update<T>(
1946 update: T,
1947 diff: StateDiff,
1948 retractions: &mut Vec<T>,
1949 additions: &mut Vec<T>,
1950 ) {
1951 match diff {
1952 StateDiff::Retraction => retractions.push(update),
1953 StateDiff::Addition => additions.push(update),
1954 }
1955 }
1956
1957 soft_assert_no_log!(
1958 updates.iter().map(|update| update.ts).all_equal(),
1959 "all timestamps should be equal: {updates:?}"
1960 );
1961 soft_assert_no_log!(
1962 {
1963 let mut dedup = BTreeSet::new();
1964 updates.iter().all(|update| dedup.insert(&update.kind))
1965 },
1966 "updates should be consolidated: {updates:?}"
1967 );
1968
1969 let mut pre_cluster_retractions = Vec::new();
1971 let mut pre_cluster_additions = Vec::new();
1972 let mut cluster_retractions = Vec::new();
1973 let mut cluster_additions = Vec::new();
1974 let mut builtin_item_updates = Vec::new();
1975 let mut item_retractions = Vec::new();
1976 let mut item_additions = Vec::new();
1977 let mut temp_item_retractions = Vec::new();
1978 let mut temp_item_additions = Vec::new();
1979 let mut post_item_retractions = Vec::new();
1980 let mut post_item_additions = Vec::new();
1981 for update in updates {
1982 let diff = update.diff.clone();
1983 match update.kind {
1984 StateUpdateKind::Role(_)
1985 | StateUpdateKind::RoleAuth(_)
1986 | StateUpdateKind::Database(_)
1987 | StateUpdateKind::Schema(_)
1988 | StateUpdateKind::DefaultPrivilege(_)
1989 | StateUpdateKind::SystemPrivilege(_)
1990 | StateUpdateKind::SystemConfiguration(_)
1991 | StateUpdateKind::NetworkPolicy(_) => push_update(
1992 update,
1993 diff,
1994 &mut pre_cluster_retractions,
1995 &mut pre_cluster_additions,
1996 ),
1997 StateUpdateKind::Cluster(_)
1998 | StateUpdateKind::IntrospectionSourceIndex(_)
1999 | StateUpdateKind::ClusterReplica(_) => push_update(
2000 update,
2001 diff,
2002 &mut cluster_retractions,
2003 &mut cluster_additions,
2004 ),
2005 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2006 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2007 }
2008 StateUpdateKind::TemporaryItem(item) => push_update(
2009 (item, update.ts, update.diff),
2010 diff,
2011 &mut temp_item_retractions,
2012 &mut temp_item_additions,
2013 ),
2014 StateUpdateKind::Item(item) => push_update(
2015 (item, update.ts, update.diff),
2016 diff,
2017 &mut item_retractions,
2018 &mut item_additions,
2019 ),
2020 StateUpdateKind::Comment(_)
2021 | StateUpdateKind::SourceReferences(_)
2022 | StateUpdateKind::AuditLog(_)
2023 | StateUpdateKind::StorageCollectionMetadata(_)
2024 | StateUpdateKind::UnfinalizedShard(_) => push_update(
2025 update,
2026 diff,
2027 &mut post_item_retractions,
2028 &mut post_item_additions,
2029 ),
2030 }
2031 }
2032
2033 let builtin_item_updates = builtin_item_updates
2035 .into_iter()
2036 .map(|(system_object_mapping, ts, diff)| {
2037 let idx = BUILTIN_LOOKUP
2038 .get(&system_object_mapping.description)
2039 .expect("missing builtin")
2040 .0;
2041 (idx, system_object_mapping, ts, diff)
2042 })
2043 .sorted_by_key(|(idx, _, _, _)| *idx)
2044 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2045
2046 let mut other_builtin_retractions = Vec::new();
2048 let mut other_builtin_additions = Vec::new();
2049 let mut builtin_index_retractions = Vec::new();
2050 let mut builtin_index_additions = Vec::new();
2051 for (builtin_item_update, ts, diff) in builtin_item_updates {
2052 match &builtin_item_update.description.object_type {
2053 CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
2054 StateUpdate {
2055 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2056 ts,
2057 diff,
2058 },
2059 diff,
2060 &mut builtin_index_retractions,
2061 &mut builtin_index_additions,
2062 ),
2063 CatalogItemType::Table
2064 | CatalogItemType::Source
2065 | CatalogItemType::Sink
2066 | CatalogItemType::View
2067 | CatalogItemType::MaterializedView
2068 | CatalogItemType::Type
2069 | CatalogItemType::Func
2070 | CatalogItemType::Secret
2071 | CatalogItemType::Connection => push_update(
2072 StateUpdate {
2073 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2074 ts,
2075 diff,
2076 },
2077 diff,
2078 &mut other_builtin_retractions,
2079 &mut other_builtin_additions,
2080 ),
2081 }
2082 }
2083
2084 fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2090 tracing::debug!(?items, "sorting items by dependencies");
2091
2092 let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2093 let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2094 let statement = mz_sql::parse::parse(&item.0.create_sql)
2095 .expect("valid create_sql")
2096 .into_element()
2097 .ast;
2098 mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2099 };
2100 sort_topological(items, key_fn, dependencies_fn);
2101 }
2102
2103 fn sort_item_updates(
2119 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2120 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2121 let mut types = Vec::new();
2124 let mut funcs = Vec::new();
2127 let mut secrets = Vec::new();
2128 let mut connections = Vec::new();
2129 let mut sources = Vec::new();
2130 let mut tables = Vec::new();
2131 let mut derived_items = Vec::new();
2132 let mut sinks = Vec::new();
2133 let mut continual_tasks = Vec::new();
2134
2135 for update in item_updates {
2136 match update.0.item_type() {
2137 CatalogItemType::Type => types.push(update),
2138 CatalogItemType::Func => funcs.push(update),
2139 CatalogItemType::Secret => secrets.push(update),
2140 CatalogItemType::Connection => connections.push(update),
2141 CatalogItemType::Source => sources.push(update),
2142 CatalogItemType::Table => tables.push(update),
2143 CatalogItemType::View
2144 | CatalogItemType::MaterializedView
2145 | CatalogItemType::Index => derived_items.push(update),
2146 CatalogItemType::Sink => sinks.push(update),
2147 CatalogItemType::ContinualTask => continual_tasks.push(update),
2148 }
2149 }
2150
2151 sort_items_topological(&mut connections);
2155 sort_items_topological(&mut derived_items);
2156
2157 for group in [
2159 &mut types,
2160 &mut funcs,
2161 &mut secrets,
2162 &mut sources,
2163 &mut tables,
2164 &mut sinks,
2165 &mut continual_tasks,
2166 ] {
2167 group.sort_by_key(|(item, _, _)| item.id);
2168 }
2169
2170 iter::empty()
2171 .chain(types)
2172 .chain(funcs)
2173 .chain(secrets)
2174 .chain(connections)
2175 .chain(sources)
2176 .chain(tables)
2177 .chain(derived_items)
2178 .chain(sinks)
2179 .chain(continual_tasks)
2180 .collect()
2181 }
2182
2183 let item_retractions = sort_item_updates(item_retractions);
2184 let item_additions = sort_item_updates(item_additions);
2185
2186 fn sort_temp_item_updates(
2190 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2191 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2192 let mut types = Vec::new();
2195 let mut funcs = Vec::new();
2197 let mut secrets = Vec::new();
2198 let mut connections = Vec::new();
2199 let mut sources = Vec::new();
2200 let mut tables = Vec::new();
2201 let mut derived_items = Vec::new();
2202 let mut sinks = Vec::new();
2203 let mut continual_tasks = Vec::new();
2204
2205 for update in temp_item_updates {
2206 match update.0.item_type() {
2207 CatalogItemType::Type => types.push(update),
2208 CatalogItemType::Func => funcs.push(update),
2209 CatalogItemType::Secret => secrets.push(update),
2210 CatalogItemType::Connection => connections.push(update),
2211 CatalogItemType::Source => sources.push(update),
2212 CatalogItemType::Table => tables.push(update),
2213 CatalogItemType::View
2214 | CatalogItemType::MaterializedView
2215 | CatalogItemType::Index => derived_items.push(update),
2216 CatalogItemType::Sink => sinks.push(update),
2217 CatalogItemType::ContinualTask => continual_tasks.push(update),
2218 }
2219 }
2220
2221 for group in [
2223 &mut types,
2224 &mut funcs,
2225 &mut secrets,
2226 &mut connections,
2227 &mut sources,
2228 &mut tables,
2229 &mut derived_items,
2230 &mut sinks,
2231 &mut continual_tasks,
2232 ] {
2233 group.sort_by_key(|(item, _, _)| item.id);
2234 }
2235
2236 iter::empty()
2237 .chain(types)
2238 .chain(funcs)
2239 .chain(secrets)
2240 .chain(connections)
2241 .chain(sources)
2242 .chain(tables)
2243 .chain(derived_items)
2244 .chain(sinks)
2245 .chain(continual_tasks)
2246 .collect()
2247 }
2248 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2249 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2250
2251 fn merge_item_updates(
2253 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2254 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2255 ) -> Vec<StateUpdate> {
2256 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2257
2258 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2259 (item_updates.front(), temp_item_updates.front())
2260 {
2261 if item.id < temp_item.id {
2262 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2263 state_updates.push(StateUpdate {
2264 kind: StateUpdateKind::Item(item),
2265 ts,
2266 diff,
2267 });
2268 } else if item.id > temp_item.id {
2269 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2270 state_updates.push(StateUpdate {
2271 kind: StateUpdateKind::TemporaryItem(temp_item),
2272 ts,
2273 diff,
2274 });
2275 } else {
2276 unreachable!(
2277 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2278 );
2279 }
2280 }
2281
2282 while let Some((item, ts, diff)) = item_updates.pop_front() {
2283 state_updates.push(StateUpdate {
2284 kind: StateUpdateKind::Item(item),
2285 ts,
2286 diff,
2287 });
2288 }
2289
2290 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2291 state_updates.push(StateUpdate {
2292 kind: StateUpdateKind::TemporaryItem(temp_item),
2293 ts,
2294 diff,
2295 });
2296 }
2297
2298 state_updates
2299 }
2300 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2301 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2302
2303 iter::empty()
2305 .chain(post_item_retractions.into_iter().rev())
2307 .chain(item_retractions.into_iter().rev())
2308 .chain(builtin_index_retractions.into_iter().rev())
2309 .chain(cluster_retractions.into_iter().rev())
2310 .chain(other_builtin_retractions.into_iter().rev())
2311 .chain(pre_cluster_retractions.into_iter().rev())
2312 .chain(pre_cluster_additions)
2313 .chain(other_builtin_additions)
2314 .chain(cluster_additions)
2315 .chain(builtin_index_additions)
2316 .chain(item_additions)
2317 .chain(post_item_additions)
2318 .collect()
2319}
2320
2321enum ApplyState {
2326 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2328 Items(Vec<StateUpdate>),
2334 Updates(Vec<StateUpdate>),
2336}
2337
2338impl ApplyState {
2339 fn new(update: StateUpdate) -> Self {
2340 use StateUpdateKind::*;
2341 match &update.kind {
2342 SystemObjectMapping(som)
2343 if som.description.object_type == CatalogItemType::View
2344 && update.diff == StateDiff::Addition =>
2345 {
2346 let view_addition = lookup_builtin_view_addition(som.clone());
2347 Self::BuiltinViewAdditions(vec![view_addition])
2348 }
2349
2350 IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2351 Self::Items(vec![update])
2352 }
2353
2354 Role(_)
2355 | RoleAuth(_)
2356 | Database(_)
2357 | Schema(_)
2358 | DefaultPrivilege(_)
2359 | SystemPrivilege(_)
2360 | SystemConfiguration(_)
2361 | Cluster(_)
2362 | NetworkPolicy(_)
2363 | ClusterReplica(_)
2364 | SourceReferences(_)
2365 | Comment(_)
2366 | AuditLog(_)
2367 | StorageCollectionMetadata(_)
2368 | UnfinalizedShard(_) => Self::Updates(vec![update]),
2369 }
2370 }
2371
2372 async fn apply(
2378 self,
2379 state: &mut CatalogState,
2380 retractions: &mut InProgressRetractions,
2381 local_expression_cache: &mut LocalExpressionCache,
2382 ) -> (
2383 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2384 Vec<ParsedStateUpdate>,
2385 ) {
2386 match self {
2387 Self::BuiltinViewAdditions(builtin_view_additions) => {
2388 let restore = Arc::clone(&state.system_configuration);
2389 Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2390 let builtin_table_updates = CatalogState::parse_builtin_views(
2391 state,
2392 builtin_view_additions,
2393 retractions,
2394 local_expression_cache,
2395 )
2396 .await;
2397 state.system_configuration = restore;
2398 (builtin_table_updates, Vec::new())
2399 }
2400 Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2401 state
2402 .apply_updates_inner(updates, retractions, local_expression_cache)
2403 .expect("corrupt catalog")
2404 }),
2405 Self::Updates(updates) => state
2406 .apply_updates_inner(updates, retractions, local_expression_cache)
2407 .expect("corrupt catalog"),
2408 }
2409 }
2410
2411 async fn step(
2412 self,
2413 next: Self,
2414 state: &mut CatalogState,
2415 retractions: &mut InProgressRetractions,
2416 local_expression_cache: &mut LocalExpressionCache,
2417 ) -> (
2418 Self,
2419 (
2420 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2421 Vec<ParsedStateUpdate>,
2422 ),
2423 ) {
2424 match (self, next) {
2425 (
2426 Self::BuiltinViewAdditions(mut builtin_view_additions),
2427 Self::BuiltinViewAdditions(next_builtin_view_additions),
2428 ) => {
2429 builtin_view_additions.extend(next_builtin_view_additions);
2431 (
2432 Self::BuiltinViewAdditions(builtin_view_additions),
2433 (Vec::new(), Vec::new()),
2434 )
2435 }
2436 (Self::Items(mut updates), Self::Items(next_updates)) => {
2437 updates.extend(next_updates);
2439 (Self::Items(updates), (Vec::new(), Vec::new()))
2440 }
2441 (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2442 updates.extend(next_updates);
2444 (Self::Updates(updates), (Vec::new(), Vec::new()))
2445 }
2446 (apply_state, next_apply_state) => {
2447 let updates = apply_state
2449 .apply(state, retractions, local_expression_cache)
2450 .await;
2451 (next_apply_state, updates)
2452 }
2453 }
2454 }
2455}
2456
2457trait MutableMap<K, V> {
2460 fn insert(&mut self, key: K, value: V) -> Option<V>;
2461 fn remove(&mut self, key: &K) -> Option<V>;
2462}
2463
2464impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2465 fn insert(&mut self, key: K, value: V) -> Option<V> {
2466 BTreeMap::insert(self, key, value)
2467 }
2468 fn remove(&mut self, key: &K) -> Option<V> {
2469 BTreeMap::remove(self, key)
2470 }
2471}
2472
2473impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2474 fn insert(&mut self, key: K, value: V) -> Option<V> {
2475 imbl::OrdMap::insert(self, key, value)
2476 }
2477 fn remove(&mut self, key: &K) -> Option<V> {
2478 imbl::OrdMap::remove(self, key)
2479 }
2480}
2481
2482fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2487where
2488 K: Ord + Clone + Debug,
2489 V: PartialEq + Debug,
2490{
2491 match diff {
2492 StateDiff::Retraction => {
2493 let prev = map.remove(key);
2494 assert_eq!(
2495 prev,
2496 Some(value),
2497 "retraction does not match existing value: {key:?}"
2498 );
2499 }
2500 StateDiff::Addition => {
2501 let prev = map.insert(key.clone(), value);
2502 assert_eq!(
2503 prev, None,
2504 "values must be explicitly retracted before inserting a new value: {key:?}"
2505 );
2506 }
2507 }
2508}
2509
2510fn apply_with_update<K, V, D>(
2513 map: &mut impl MutableMap<K, V>,
2514 durable: D,
2515 key_fn: impl FnOnce(&D) -> K,
2516 diff: StateDiff,
2517 retractions: &mut BTreeMap<D::Key, V>,
2518) where
2519 K: Ord,
2520 V: UpdateFrom<D> + PartialEq + Debug,
2521 D: DurableType,
2522 D::Key: Ord,
2523{
2524 match diff {
2525 StateDiff::Retraction => {
2526 let mem_key = key_fn(&durable);
2527 let value = map
2528 .remove(&mem_key)
2529 .expect("retraction does not match existing value: {key:?}");
2530 let durable_key = durable.into_key_value().0;
2531 retractions.insert(durable_key, value);
2532 }
2533 StateDiff::Addition => {
2534 let mem_key = key_fn(&durable);
2535 let durable_key = durable.key();
2536 let value = match retractions.remove(&durable_key) {
2537 Some(mut retraction) => {
2538 retraction.update_from(durable);
2539 retraction
2540 }
2541 None => durable.into(),
2542 };
2543 let prev = map.insert(mem_key, value);
2544 assert_eq!(
2545 prev, None,
2546 "values must be explicitly retracted before inserting a new value"
2547 );
2548 }
2549 }
2550}
2551
2552fn lookup_builtin_view_addition(
2554 mapping: SystemObjectMapping,
2555) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2556 let (_, builtin) = BUILTIN_LOOKUP
2557 .get(&mapping.description)
2558 .expect("missing builtin view");
2559 let Builtin::View(view) = builtin else {
2560 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2561 };
2562
2563 (
2564 view,
2565 mapping.unique_identifier.catalog_id,
2566 mapping.unique_identifier.global_id,
2567 )
2568}