1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29 SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
35 NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36 TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
40use mz_controller_types::ClusterId;
41use mz_expr::MirScalarExpr;
42use mz_ore::collections::CollectionExt;
43use mz_ore::tracing::OpenTelemetryContext;
44use mz_ore::{assert_none, instrument, soft_assert_no_log};
45use mz_pgrepr::oid::INVALID_OID;
46use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
47use mz_repr::role_id::RoleId;
48use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
49use mz_sql::catalog::CatalogError as SqlCatalogError;
50use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
51use mz_sql::names::{
52 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
53 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
54};
55use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
56use mz_sql::session::vars::{VarError, VarInput};
57use mz_sql::{plan, rbac};
58use mz_sql_parser::ast::Expr;
59use mz_storage_types::sources::Timeline;
60use tracing::{info_span, warn};
61
62use crate::AdapterError;
63use crate::catalog::state::LocalExpressionCache;
64use crate::catalog::{BuiltinTableUpdate, CatalogState};
65use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
66use crate::util::index_sql;
67
68#[derive(Debug, Clone, Default)]
80struct InProgressRetractions {
81 roles: BTreeMap<RoleKey, Role>,
82 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
83 databases: BTreeMap<DatabaseKey, Database>,
84 schemas: BTreeMap<SchemaKey, Schema>,
85 clusters: BTreeMap<ClusterKey, Cluster>,
86 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
87 items: BTreeMap<ItemKey, CatalogEntry>,
88 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
89 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
90 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
91}
92
93impl CatalogState {
94 #[must_use]
98 #[instrument]
99 pub(crate) async fn apply_updates(
100 &mut self,
101 updates: Vec<StateUpdate>,
102 local_expression_cache: &mut LocalExpressionCache,
103 ) -> (
104 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
105 Vec<ParsedStateUpdate>,
106 ) {
107 let mut builtin_table_updates = Vec::with_capacity(updates.len());
108 let mut catalog_updates = Vec::with_capacity(updates.len());
109
110 let updates = Self::consolidate_updates(updates);
115
116 let mut groups: Vec<Vec<_>> = Vec::new();
118 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
119 let updates = sort_updates(updates.collect());
123 groups.push(updates);
124 }
125
126 for updates in groups {
127 let mut apply_state = ApplyState::Updates(Vec::new());
128 let mut retractions = InProgressRetractions::default();
129
130 for update in updates {
131 let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
132 .step(
133 ApplyState::new(update),
134 self,
135 &mut retractions,
136 local_expression_cache,
137 )
138 .await;
139 apply_state = next_apply_state;
140 builtin_table_updates.extend(builtin_table_update);
141 catalog_updates.extend(catalog_update);
142 }
143
144 let (builtin_table_update, catalog_update) = apply_state
146 .apply(self, &mut retractions, local_expression_cache)
147 .await;
148 builtin_table_updates.extend(builtin_table_update);
149 catalog_updates.extend(catalog_update);
150 }
151
152 (builtin_table_updates, catalog_updates)
153 }
154
155 fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
163 let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
164 .into_iter()
165 .map(|update| (update.kind, update.ts, update.diff.into()))
166 .collect_vec();
167
168 consolidate_updates(&mut updates);
169
170 updates
171 .into_iter()
172 .map(|(kind, ts, diff)| StateUpdate {
173 kind,
174 ts,
175 diff: diff
176 .try_into()
177 .expect("catalog state cannot have diff other than -1 or 1"),
178 })
179 .collect_vec()
180 }
181
182 #[instrument(level = "debug")]
183 fn apply_updates_inner(
184 &mut self,
185 updates: Vec<StateUpdate>,
186 retractions: &mut InProgressRetractions,
187 local_expression_cache: &mut LocalExpressionCache,
188 ) -> Result<
189 (
190 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
191 Vec<ParsedStateUpdate>,
192 ),
193 CatalogError,
194 > {
195 soft_assert_no_log!(
196 updates.iter().map(|update| update.ts).all_equal(),
197 "all timestamps should be equal: {updates:?}"
198 );
199
200 let mut update_system_config = false;
201
202 let mut builtin_table_updates = Vec::with_capacity(updates.len());
203 let mut catalog_updates = Vec::new();
204
205 for state_update in updates {
206 if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
207 update_system_config = true;
208 }
209
210 match state_update.diff {
211 StateDiff::Retraction => {
212 if let Some(update) =
216 parsed_state_updates::parse_state_update(self, state_update.clone())
217 {
218 catalog_updates.push(update);
219 }
220
221 builtin_table_updates.extend(self.generate_builtin_table_update(
224 state_update.kind.clone(),
225 state_update.diff,
226 ));
227 self.apply_update(
228 state_update.kind,
229 state_update.diff,
230 retractions,
231 local_expression_cache,
232 )?;
233 }
234 StateDiff::Addition => {
235 self.apply_update(
236 state_update.kind.clone(),
237 state_update.diff,
238 retractions,
239 local_expression_cache,
240 )?;
241 builtin_table_updates.extend(self.generate_builtin_table_update(
245 state_update.kind.clone(),
246 state_update.diff,
247 ));
248
249 if let Some(update) =
252 parsed_state_updates::parse_state_update(self, state_update.clone())
253 {
254 catalog_updates.push(update);
255 }
256 }
257 }
258 }
259
260 if update_system_config {
261 self.system_configuration.dyncfg_updates();
262 }
263
264 Ok((builtin_table_updates, catalog_updates))
265 }
266
267 #[instrument(level = "debug")]
268 fn apply_update(
269 &mut self,
270 kind: StateUpdateKind,
271 diff: StateDiff,
272 retractions: &mut InProgressRetractions,
273 local_expression_cache: &mut LocalExpressionCache,
274 ) -> Result<(), CatalogError> {
275 match kind {
276 StateUpdateKind::Role(role) => {
277 self.apply_role_update(role, diff, retractions);
278 }
279 StateUpdateKind::RoleAuth(role_auth) => {
280 self.apply_role_auth_update(role_auth, diff, retractions);
281 }
282 StateUpdateKind::Database(database) => {
283 self.apply_database_update(database, diff, retractions);
284 }
285 StateUpdateKind::Schema(schema) => {
286 self.apply_schema_update(schema, diff, retractions);
287 }
288 StateUpdateKind::DefaultPrivilege(default_privilege) => {
289 self.apply_default_privilege_update(default_privilege, diff, retractions);
290 }
291 StateUpdateKind::SystemPrivilege(system_privilege) => {
292 self.apply_system_privilege_update(system_privilege, diff, retractions);
293 }
294 StateUpdateKind::SystemConfiguration(system_configuration) => {
295 self.apply_system_configuration_update(system_configuration, diff, retractions);
296 }
297 StateUpdateKind::Cluster(cluster) => {
298 self.apply_cluster_update(cluster, diff, retractions);
299 }
300 StateUpdateKind::NetworkPolicy(network_policy) => {
301 self.apply_network_policy_update(network_policy, diff, retractions);
302 }
303 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304 self.apply_introspection_source_index_update(
305 introspection_source_index,
306 diff,
307 retractions,
308 );
309 }
310 StateUpdateKind::ClusterReplica(cluster_replica) => {
311 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
312 }
313 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
314 self.apply_system_object_mapping_update(
315 system_object_mapping,
316 diff,
317 retractions,
318 local_expression_cache,
319 );
320 }
321 StateUpdateKind::TemporaryItem(item) => {
322 self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
323 }
324 StateUpdateKind::Item(item) => {
325 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
326 }
327 StateUpdateKind::Comment(comment) => {
328 self.apply_comment_update(comment, diff, retractions);
329 }
330 StateUpdateKind::SourceReferences(source_reference) => {
331 self.apply_source_references_update(source_reference, diff, retractions);
332 }
333 StateUpdateKind::AuditLog(_audit_log) => {
334 }
336 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
337 self.apply_storage_collection_metadata_update(
338 storage_collection_metadata,
339 diff,
340 retractions,
341 );
342 }
343 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
344 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
345 }
346 }
347
348 Ok(())
349 }
350
351 #[instrument(level = "debug")]
352 fn apply_role_auth_update(
353 &mut self,
354 role_auth: mz_catalog::durable::RoleAuth,
355 diff: StateDiff,
356 retractions: &mut InProgressRetractions,
357 ) {
358 apply_with_update(
359 &mut self.role_auth_by_id,
360 role_auth,
361 |role_auth| role_auth.role_id,
362 diff,
363 &mut retractions.role_auths,
364 );
365 }
366
367 #[instrument(level = "debug")]
368 fn apply_role_update(
369 &mut self,
370 role: mz_catalog::durable::Role,
371 diff: StateDiff,
372 retractions: &mut InProgressRetractions,
373 ) {
374 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
375 apply_with_update(
376 &mut self.roles_by_id,
377 role,
378 |role| role.id,
379 diff,
380 &mut retractions.roles,
381 );
382 }
383
384 #[instrument(level = "debug")]
385 fn apply_database_update(
386 &mut self,
387 database: mz_catalog::durable::Database,
388 diff: StateDiff,
389 retractions: &mut InProgressRetractions,
390 ) {
391 apply_inverted_lookup(
392 &mut self.database_by_name,
393 &database.name,
394 database.id,
395 diff,
396 );
397 apply_with_update(
398 &mut self.database_by_id,
399 database,
400 |database| database.id,
401 diff,
402 &mut retractions.databases,
403 );
404 }
405
406 #[instrument(level = "debug")]
407 fn apply_schema_update(
408 &mut self,
409 schema: mz_catalog::durable::Schema,
410 diff: StateDiff,
411 retractions: &mut InProgressRetractions,
412 ) {
413 let (schemas_by_id, schemas_by_name) = match &schema.database_id {
414 Some(database_id) => {
415 let db = self
416 .database_by_id
417 .get_mut(database_id)
418 .expect("catalog out of sync");
419 (&mut db.schemas_by_id, &mut db.schemas_by_name)
420 }
421 None => (
422 &mut self.ambient_schemas_by_id,
423 &mut self.ambient_schemas_by_name,
424 ),
425 };
426 apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
427 apply_with_update(
428 schemas_by_id,
429 schema,
430 |schema| schema.id,
431 diff,
432 &mut retractions.schemas,
433 );
434 }
435
436 #[instrument(level = "debug")]
437 fn apply_default_privilege_update(
438 &mut self,
439 default_privilege: mz_catalog::durable::DefaultPrivilege,
440 diff: StateDiff,
441 _retractions: &mut InProgressRetractions,
442 ) {
443 match diff {
444 StateDiff::Addition => self
445 .default_privileges
446 .grant(default_privilege.object, default_privilege.acl_item),
447 StateDiff::Retraction => self
448 .default_privileges
449 .revoke(&default_privilege.object, &default_privilege.acl_item),
450 }
451 }
452
453 #[instrument(level = "debug")]
454 fn apply_system_privilege_update(
455 &mut self,
456 system_privilege: MzAclItem,
457 diff: StateDiff,
458 _retractions: &mut InProgressRetractions,
459 ) {
460 match diff {
461 StateDiff::Addition => self.system_privileges.grant(system_privilege),
462 StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
463 }
464 }
465
466 #[instrument(level = "debug")]
467 fn apply_system_configuration_update(
468 &mut self,
469 system_configuration: mz_catalog::durable::SystemConfiguration,
470 diff: StateDiff,
471 _retractions: &mut InProgressRetractions,
472 ) {
473 let res = match diff {
474 StateDiff::Addition => self.insert_system_configuration(
475 &system_configuration.name,
476 VarInput::Flat(&system_configuration.value),
477 ),
478 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
479 };
480 match res {
481 Ok(_) => (),
482 Err(Error {
486 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
487 }) => {
488 warn!(%name, "unknown system parameter from catalog storage");
489 }
490 Err(e) => panic!("unable to update system variable: {e:?}"),
491 }
492 }
493
494 #[instrument(level = "debug")]
495 fn apply_cluster_update(
496 &mut self,
497 cluster: mz_catalog::durable::Cluster,
498 diff: StateDiff,
499 retractions: &mut InProgressRetractions,
500 ) {
501 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
502 apply_with_update(
503 &mut self.clusters_by_id,
504 cluster,
505 |cluster| cluster.id,
506 diff,
507 &mut retractions.clusters,
508 );
509 }
510
511 #[instrument(level = "debug")]
512 fn apply_network_policy_update(
513 &mut self,
514 policy: mz_catalog::durable::NetworkPolicy,
515 diff: StateDiff,
516 retractions: &mut InProgressRetractions,
517 ) {
518 apply_inverted_lookup(
519 &mut self.network_policies_by_name,
520 &policy.name,
521 policy.id,
522 diff,
523 );
524 apply_with_update(
525 &mut self.network_policies_by_id,
526 policy,
527 |policy| policy.id,
528 diff,
529 &mut retractions.network_policies,
530 );
531 }
532
533 #[instrument(level = "debug")]
534 fn apply_introspection_source_index_update(
535 &mut self,
536 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
537 diff: StateDiff,
538 retractions: &mut InProgressRetractions,
539 ) {
540 let cluster = self
541 .clusters_by_id
542 .get_mut(&introspection_source_index.cluster_id)
543 .expect("catalog out of sync");
544 let log = BUILTIN_LOG_LOOKUP
545 .get(introspection_source_index.name.as_str())
546 .expect("missing log");
547 apply_inverted_lookup(
548 &mut cluster.log_indexes,
549 &log.variant,
550 introspection_source_index.index_id,
551 diff,
552 );
553
554 match diff {
555 StateDiff::Addition => {
556 if let Some(mut entry) = retractions
557 .introspection_source_indexes
558 .remove(&introspection_source_index.item_id)
559 {
560 let (index_name, index) = self.create_introspection_source_index(
563 introspection_source_index.cluster_id,
564 log,
565 introspection_source_index.index_id,
566 );
567 assert_eq!(entry.id, introspection_source_index.item_id);
568 assert_eq!(entry.oid, introspection_source_index.oid);
569 assert_eq!(entry.name, index_name);
570 entry.item = index;
571 self.insert_entry(entry);
572 } else {
573 self.insert_introspection_source_index(
574 introspection_source_index.cluster_id,
575 log,
576 introspection_source_index.item_id,
577 introspection_source_index.index_id,
578 introspection_source_index.oid,
579 );
580 }
581 }
582 StateDiff::Retraction => {
583 let entry = self.drop_item(introspection_source_index.item_id);
584 retractions
585 .introspection_source_indexes
586 .insert(entry.id, entry);
587 }
588 }
589 }
590
591 #[instrument(level = "debug")]
592 fn apply_cluster_replica_update(
593 &mut self,
594 cluster_replica: mz_catalog::durable::ClusterReplica,
595 diff: StateDiff,
596 _retractions: &mut InProgressRetractions,
597 ) {
598 let cluster = self
599 .clusters_by_id
600 .get(&cluster_replica.cluster_id)
601 .expect("catalog out of sync");
602 let azs = cluster.availability_zones();
603 let location = self
604 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
605 .expect("catalog in unexpected state");
606 let cluster = self
607 .clusters_by_id
608 .get_mut(&cluster_replica.cluster_id)
609 .expect("catalog out of sync");
610 apply_inverted_lookup(
611 &mut cluster.replica_id_by_name_,
612 &cluster_replica.name,
613 cluster_replica.replica_id,
614 diff,
615 );
616 match diff {
617 StateDiff::Retraction => {
618 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
619 assert!(
620 prev.is_some(),
621 "retraction does not match existing value: {:?}",
622 cluster_replica.replica_id
623 );
624 }
625 StateDiff::Addition => {
626 let logging = ReplicaLogging {
627 log_logging: cluster_replica.config.logging.log_logging,
628 interval: cluster_replica.config.logging.interval,
629 };
630 let config = ReplicaConfig {
631 location,
632 compute: ComputeReplicaConfig { logging },
633 };
634 let mem_cluster_replica = ClusterReplica {
635 name: cluster_replica.name.clone(),
636 cluster_id: cluster_replica.cluster_id,
637 replica_id: cluster_replica.replica_id,
638 config,
639 owner_id: cluster_replica.owner_id,
640 };
641 let prev = cluster
642 .replicas_by_id_
643 .insert(cluster_replica.replica_id, mem_cluster_replica);
644 assert_eq!(
645 prev, None,
646 "values must be explicitly retracted before inserting a new value: {:?}",
647 cluster_replica.replica_id
648 );
649 }
650 }
651 }
652
653 #[instrument(level = "debug")]
654 fn apply_system_object_mapping_update(
655 &mut self,
656 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
657 diff: StateDiff,
658 retractions: &mut InProgressRetractions,
659 local_expression_cache: &mut LocalExpressionCache,
660 ) {
661 let item_id = system_object_mapping.unique_identifier.catalog_id;
662 let global_id = system_object_mapping.unique_identifier.global_id;
663
664 if system_object_mapping.unique_identifier.runtime_alterable() {
665 return;
669 }
670
671 if let StateDiff::Retraction = diff {
672 let entry = self.drop_item(item_id);
673 retractions.system_object_mappings.insert(item_id, entry);
674 return;
675 }
676
677 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
678 self.insert_entry(entry);
683 return;
684 }
685
686 let builtin = BUILTIN_LOOKUP
687 .get(&system_object_mapping.description)
688 .expect("missing builtin")
689 .1;
690 let schema_name = builtin.schema();
691 let schema_id = self
692 .ambient_schemas_by_name
693 .get(schema_name)
694 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
695 let name = QualifiedItemName {
696 qualifiers: ItemQualifiers {
697 database_spec: ResolvedDatabaseSpecifier::Ambient,
698 schema_spec: SchemaSpecifier::Id(*schema_id),
699 },
700 item: builtin.name().into(),
701 };
702 match builtin {
703 Builtin::Log(log) => {
704 let mut acl_items = vec![rbac::owner_privilege(
705 mz_sql::catalog::ObjectType::Source,
706 MZ_SYSTEM_ROLE_ID,
707 )];
708 acl_items.extend_from_slice(&log.access);
709 self.insert_item(
710 item_id,
711 log.oid,
712 name.clone(),
713 CatalogItem::Log(Log {
714 variant: log.variant,
715 global_id,
716 }),
717 MZ_SYSTEM_ROLE_ID,
718 PrivilegeMap::from_mz_acl_items(acl_items),
719 );
720 }
721
722 Builtin::Table(table) => {
723 let mut acl_items = vec![rbac::owner_privilege(
724 mz_sql::catalog::ObjectType::Table,
725 MZ_SYSTEM_ROLE_ID,
726 )];
727 acl_items.extend_from_slice(&table.access);
728
729 self.insert_item(
730 item_id,
731 table.oid,
732 name.clone(),
733 CatalogItem::Table(Table {
734 create_sql: None,
735 desc: VersionedRelationDesc::new(table.desc.clone()),
736 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
737 conn_id: None,
738 resolved_ids: ResolvedIds::empty(),
739 custom_logical_compaction_window: table.is_retained_metrics_object.then(
740 || {
741 self.system_config()
742 .metrics_retention()
743 .try_into()
744 .expect("invalid metrics retention")
745 },
746 ),
747 is_retained_metrics_object: table.is_retained_metrics_object,
748 data_source: TableDataSource::TableWrites {
749 defaults: vec![Expr::null(); table.desc.arity()],
750 },
751 }),
752 MZ_SYSTEM_ROLE_ID,
753 PrivilegeMap::from_mz_acl_items(acl_items),
754 );
755 }
756 Builtin::Index(index) => {
757 let custom_logical_compaction_window =
758 index.is_retained_metrics_object.then(|| {
759 self.system_config()
760 .metrics_retention()
761 .try_into()
762 .expect("invalid metrics retention")
763 });
764 let versions = BTreeMap::new();
766
767 let item = self
768 .parse_item(
769 global_id,
770 &index.create_sql(),
771 &versions,
772 None,
773 index.is_retained_metrics_object,
774 custom_logical_compaction_window,
775 local_expression_cache,
776 None,
777 )
778 .unwrap_or_else(|e| {
779 panic!(
780 "internal error: failed to load bootstrap index:\n\
781 {}\n\
782 error:\n\
783 {:?}\n\n\
784 make sure that the schema name is specified in the builtin index's create sql statement.",
785 index.name, e
786 )
787 });
788 let CatalogItem::Index(_) = item else {
789 panic!(
790 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
791 index.name
792 );
793 };
794
795 self.insert_item(
796 item_id,
797 index.oid,
798 name,
799 item,
800 MZ_SYSTEM_ROLE_ID,
801 PrivilegeMap::default(),
802 );
803 }
804 Builtin::View(_) => {
805 unreachable!("views added elsewhere");
807 }
808
809 Builtin::Type(typ) => {
811 let typ = self.resolve_builtin_type_references(typ);
812 if let CatalogType::Array { element_reference } = typ.details.typ {
813 let entry = self.get_entry_mut(&element_reference);
814 let item_type = match &mut entry.item {
815 CatalogItem::Type(item_type) => item_type,
816 _ => unreachable!("types can only reference other types"),
817 };
818 item_type.details.array_id = Some(item_id);
819 }
820
821 let schema_id = self.resolve_system_schema(typ.schema);
822
823 self.insert_item(
824 item_id,
825 typ.oid,
826 QualifiedItemName {
827 qualifiers: ItemQualifiers {
828 database_spec: ResolvedDatabaseSpecifier::Ambient,
829 schema_spec: SchemaSpecifier::Id(schema_id),
830 },
831 item: typ.name.to_owned(),
832 },
833 CatalogItem::Type(Type {
834 create_sql: None,
835 global_id,
836 details: typ.details.clone(),
837 resolved_ids: ResolvedIds::empty(),
838 }),
839 MZ_SYSTEM_ROLE_ID,
840 PrivilegeMap::from_mz_acl_items(vec![
841 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
842 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
843 ]),
844 );
845 }
846
847 Builtin::Func(func) => {
848 let oid = INVALID_OID;
852 self.insert_item(
853 item_id,
854 oid,
855 name.clone(),
856 CatalogItem::Func(Func {
857 inner: func.inner,
858 global_id,
859 }),
860 MZ_SYSTEM_ROLE_ID,
861 PrivilegeMap::default(),
862 );
863 }
864
865 Builtin::Source(coll) => {
866 let mut acl_items = vec![rbac::owner_privilege(
867 mz_sql::catalog::ObjectType::Source,
868 MZ_SYSTEM_ROLE_ID,
869 )];
870 acl_items.extend_from_slice(&coll.access);
871
872 self.insert_item(
873 item_id,
874 coll.oid,
875 name.clone(),
876 CatalogItem::Source(Source {
877 create_sql: None,
878 data_source: DataSourceDesc::Introspection(coll.data_source),
879 desc: coll.desc.clone(),
880 global_id,
881 timeline: Timeline::EpochMilliseconds,
882 resolved_ids: ResolvedIds::empty(),
883 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
884 || {
885 self.system_config()
886 .metrics_retention()
887 .try_into()
888 .expect("invalid metrics retention")
889 },
890 ),
891 is_retained_metrics_object: coll.is_retained_metrics_object,
892 }),
893 MZ_SYSTEM_ROLE_ID,
894 PrivilegeMap::from_mz_acl_items(acl_items),
895 );
896 }
897 Builtin::ContinualTask(ct) => {
898 let mut acl_items = vec![rbac::owner_privilege(
899 mz_sql::catalog::ObjectType::Source,
900 MZ_SYSTEM_ROLE_ID,
901 )];
902 acl_items.extend_from_slice(&ct.access);
903 let versions = BTreeMap::new();
905
906 let item = self
907 .parse_item(
908 global_id,
909 &ct.create_sql(),
910 &versions,
911 None,
912 false,
913 None,
914 local_expression_cache,
915 None,
916 )
917 .unwrap_or_else(|e| {
918 panic!(
919 "internal error: failed to load bootstrap continual task:\n\
920 {}\n\
921 error:\n\
922 {:?}\n\n\
923 make sure that the schema name is specified in the builtin continual task's create sql statement.",
924 ct.name, e
925 )
926 });
927 let CatalogItem::ContinualTask(_) = &item else {
928 panic!(
929 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
930 ct.name
931 );
932 };
933
934 self.insert_item(
935 item_id,
936 ct.oid,
937 name,
938 item,
939 MZ_SYSTEM_ROLE_ID,
940 PrivilegeMap::from_mz_acl_items(acl_items),
941 );
942 }
943 Builtin::Connection(connection) => {
944 let versions = BTreeMap::new();
946 let mut item = self
947 .parse_item(
948 global_id,
949 connection.sql,
950 &versions,
951 None,
952 false,
953 None,
954 local_expression_cache,
955 None,
956 )
957 .unwrap_or_else(|e| {
958 panic!(
959 "internal error: failed to load bootstrap connection:\n\
960 {}\n\
961 error:\n\
962 {:?}\n\n\
963 make sure that the schema name is specified in the builtin connection's create sql statement.",
964 connection.name, e
965 )
966 });
967 let CatalogItem::Connection(_) = &mut item else {
968 panic!(
969 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
970 connection.name
971 );
972 };
973
974 let mut acl_items = vec![rbac::owner_privilege(
975 mz_sql::catalog::ObjectType::Connection,
976 connection.owner_id.clone(),
977 )];
978 acl_items.extend_from_slice(connection.access);
979
980 self.insert_item(
981 item_id,
982 connection.oid,
983 name.clone(),
984 item,
985 connection.owner_id.clone(),
986 PrivilegeMap::from_mz_acl_items(acl_items),
987 );
988 }
989 }
990 }
991
992 #[instrument(level = "debug")]
993 fn apply_temporary_item_update(
994 &mut self,
995 temporary_item: TemporaryItem,
996 diff: StateDiff,
997 retractions: &mut InProgressRetractions,
998 local_expression_cache: &mut LocalExpressionCache,
999 ) {
1000 match diff {
1001 StateDiff::Addition => {
1002 let TemporaryItem {
1003 id,
1004 oid,
1005 global_id,
1006 schema_id,
1007 name,
1008 conn_id,
1009 create_sql,
1010 owner_id,
1011 privileges,
1012 extra_versions,
1013 } = temporary_item;
1014 let temp_conn_id = conn_id
1017 .as_ref()
1018 .expect("temporary items must have a connection id");
1019 if !self.temporary_schemas.contains_key(temp_conn_id) {
1020 self.create_temporary_schema(temp_conn_id, owner_id)
1021 .expect("failed to create temporary schema");
1022 }
1023 let schema = self.find_temp_schema(&schema_id);
1024 let name = QualifiedItemName {
1025 qualifiers: ItemQualifiers {
1026 database_spec: schema.database().clone(),
1027 schema_spec: schema.id().clone(),
1028 },
1029 item: name.clone(),
1030 };
1031
1032 let entry = match retractions.temp_items.remove(&id) {
1033 Some(mut retraction) => {
1034 assert_eq!(retraction.id, id);
1035
1036 if retraction.create_sql() != create_sql {
1041 let mut catalog_item = self
1042 .deserialize_item(
1043 global_id,
1044 &create_sql,
1045 &extra_versions,
1046 local_expression_cache,
1047 Some(retraction.item),
1048 )
1049 .unwrap_or_else(|e| {
1050 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1051 });
1052 catalog_item.set_conn_id(conn_id);
1059 retraction.item = catalog_item;
1060 }
1061
1062 retraction.id = id;
1063 retraction.oid = oid;
1064 retraction.name = name;
1065 retraction.owner_id = owner_id;
1066 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1067 retraction
1068 }
1069 None => {
1070 let mut catalog_item = self
1071 .deserialize_item(
1072 global_id,
1073 &create_sql,
1074 &extra_versions,
1075 local_expression_cache,
1076 None,
1077 )
1078 .unwrap_or_else(|e| {
1079 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1080 });
1081
1082 catalog_item.set_conn_id(conn_id);
1088
1089 CatalogEntry {
1090 item: catalog_item,
1091 referenced_by: Vec::new(),
1092 used_by: Vec::new(),
1093 id,
1094 oid,
1095 name,
1096 owner_id,
1097 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1098 }
1099 }
1100 };
1101 self.insert_entry(entry);
1102 }
1103 StateDiff::Retraction => {
1104 let entry = self.drop_item(temporary_item.id);
1105 retractions.temp_items.insert(temporary_item.id, entry);
1106 }
1107 }
1108 }
1109
1110 #[instrument(level = "debug")]
1111 fn apply_item_update(
1112 &mut self,
1113 item: mz_catalog::durable::Item,
1114 diff: StateDiff,
1115 retractions: &mut InProgressRetractions,
1116 local_expression_cache: &mut LocalExpressionCache,
1117 ) -> Result<(), CatalogError> {
1118 match diff {
1119 StateDiff::Addition => {
1120 let key = item.key();
1121 let mz_catalog::durable::Item {
1122 id,
1123 oid,
1124 global_id,
1125 schema_id,
1126 name,
1127 create_sql,
1128 owner_id,
1129 privileges,
1130 extra_versions,
1131 } = item;
1132 let schema = self.find_non_temp_schema(&schema_id);
1133 let name = QualifiedItemName {
1134 qualifiers: ItemQualifiers {
1135 database_spec: schema.database().clone(),
1136 schema_spec: schema.id().clone(),
1137 },
1138 item: name.clone(),
1139 };
1140 let entry = match retractions.items.remove(&key) {
1141 Some(retraction) => {
1142 assert_eq!(retraction.id, item.id);
1143
1144 let item = self
1145 .deserialize_item(
1146 global_id,
1147 &create_sql,
1148 &extra_versions,
1149 local_expression_cache,
1150 Some(retraction.item),
1151 )
1152 .unwrap_or_else(|e| {
1153 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1154 });
1155
1156 CatalogEntry {
1157 item,
1158 id,
1159 oid,
1160 name,
1161 owner_id,
1162 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1163 referenced_by: retraction.referenced_by,
1164 used_by: retraction.used_by,
1165 }
1166 }
1167 None => {
1168 let catalog_item = self
1169 .deserialize_item(
1170 global_id,
1171 &create_sql,
1172 &extra_versions,
1173 local_expression_cache,
1174 None,
1175 )
1176 .unwrap_or_else(|e| {
1177 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1178 });
1179 CatalogEntry {
1180 item: catalog_item,
1181 referenced_by: Vec::new(),
1182 used_by: Vec::new(),
1183 id,
1184 oid,
1185 name,
1186 owner_id,
1187 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1188 }
1189 }
1190 };
1191
1192 self.insert_entry(entry);
1193 }
1194 StateDiff::Retraction => {
1195 let entry = self.drop_item(item.id);
1196 let key = item.into_key_value().0;
1197 retractions.items.insert(key, entry);
1198 }
1199 }
1200 Ok(())
1201 }
1202
1203 #[instrument(level = "debug")]
1204 fn apply_comment_update(
1205 &mut self,
1206 comment: mz_catalog::durable::Comment,
1207 diff: StateDiff,
1208 _retractions: &mut InProgressRetractions,
1209 ) {
1210 match diff {
1211 StateDiff::Addition => {
1212 let prev = self.comments.update_comment(
1213 comment.object_id,
1214 comment.sub_component,
1215 Some(comment.comment),
1216 );
1217 assert_eq!(
1218 prev, None,
1219 "values must be explicitly retracted before inserting a new value"
1220 );
1221 }
1222 StateDiff::Retraction => {
1223 let prev =
1224 self.comments
1225 .update_comment(comment.object_id, comment.sub_component, None);
1226 assert_eq!(
1227 prev,
1228 Some(comment.comment),
1229 "retraction does not match existing value: ({:?}, {:?})",
1230 comment.object_id,
1231 comment.sub_component,
1232 );
1233 }
1234 }
1235 }
1236
1237 #[instrument(level = "debug")]
1238 fn apply_source_references_update(
1239 &mut self,
1240 source_references: mz_catalog::durable::SourceReferences,
1241 diff: StateDiff,
1242 _retractions: &mut InProgressRetractions,
1243 ) {
1244 match diff {
1245 StateDiff::Addition => {
1246 let prev = self
1247 .source_references
1248 .insert(source_references.source_id, source_references.into());
1249 assert!(
1250 prev.is_none(),
1251 "values must be explicitly retracted before inserting a new value: {prev:?}"
1252 );
1253 }
1254 StateDiff::Retraction => {
1255 let prev = self.source_references.remove(&source_references.source_id);
1256 assert!(
1257 prev.is_some(),
1258 "retraction for a non-existent existing value: {source_references:?}"
1259 );
1260 }
1261 }
1262 }
1263
1264 #[instrument(level = "debug")]
1265 fn apply_storage_collection_metadata_update(
1266 &mut self,
1267 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1268 diff: StateDiff,
1269 _retractions: &mut InProgressRetractions,
1270 ) {
1271 apply_inverted_lookup(
1272 &mut self.storage_metadata.collection_metadata,
1273 &storage_collection_metadata.id,
1274 storage_collection_metadata.shard,
1275 diff,
1276 );
1277 }
1278
1279 #[instrument(level = "debug")]
1280 fn apply_unfinalized_shard_update(
1281 &mut self,
1282 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1283 diff: StateDiff,
1284 _retractions: &mut InProgressRetractions,
1285 ) {
1286 match diff {
1287 StateDiff::Addition => {
1288 let newly_inserted = self
1289 .storage_metadata
1290 .unfinalized_shards
1291 .insert(unfinalized_shard.shard);
1292 assert!(
1293 newly_inserted,
1294 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1295 );
1296 }
1297 StateDiff::Retraction => {
1298 let removed = self
1299 .storage_metadata
1300 .unfinalized_shards
1301 .remove(&unfinalized_shard.shard);
1302 assert!(
1303 removed,
1304 "retraction does not match existing value: {unfinalized_shard:?}"
1305 );
1306 }
1307 }
1308 }
1309
1310 #[instrument]
1313 pub(crate) fn generate_builtin_table_updates(
1314 &self,
1315 updates: Vec<StateUpdate>,
1316 ) -> Vec<BuiltinTableUpdate> {
1317 let mut builtin_table_updates = Vec::new();
1318 for StateUpdate { kind, ts: _, diff } in updates {
1319 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1320 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1321 builtin_table_updates.extend(builtin_table_update);
1322 }
1323 builtin_table_updates
1324 }
1325
1326 #[instrument(level = "debug")]
1329 pub(crate) fn generate_builtin_table_update(
1330 &self,
1331 kind: StateUpdateKind,
1332 diff: StateDiff,
1333 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1334 let diff = diff.into();
1335 match kind {
1336 StateUpdateKind::Role(role) => {
1337 let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1338 for group_id in role.membership.map.keys() {
1339 builtin_table_updates
1340 .push(self.pack_role_members_update(*group_id, role.id, diff))
1341 }
1342 builtin_table_updates
1343 }
1344 StateUpdateKind::RoleAuth(role_auth) => {
1345 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1346 }
1347 StateUpdateKind::Database(database) => {
1348 vec![self.pack_database_update(&database.id, diff)]
1349 }
1350 StateUpdateKind::Schema(schema) => {
1351 let db_spec = schema.database_id.into();
1352 vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1353 }
1354 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1355 vec![self.pack_default_privileges_update(
1356 &default_privilege.object,
1357 &default_privilege.acl_item.grantee,
1358 &default_privilege.acl_item.acl_mode,
1359 diff,
1360 )]
1361 }
1362 StateUpdateKind::SystemPrivilege(system_privilege) => {
1363 vec![self.pack_system_privileges_update(system_privilege, diff)]
1364 }
1365 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1366 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1367 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1368 self.pack_item_update(introspection_source_index.item_id, diff)
1369 }
1370 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1371 cluster_replica.cluster_id,
1372 &cluster_replica.name,
1373 diff,
1374 ),
1375 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1376 if !system_object_mapping.unique_identifier.runtime_alterable() {
1380 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1381 } else {
1382 vec![]
1383 }
1384 }
1385 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1386 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1387 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1388 comment.object_id,
1389 comment.sub_component,
1390 &comment.comment,
1391 diff,
1392 )],
1393 StateUpdateKind::SourceReferences(source_references) => {
1394 self.pack_source_references_update(&source_references, diff)
1395 }
1396 StateUpdateKind::AuditLog(audit_log) => {
1397 vec![
1398 self.pack_audit_log_update(&audit_log.event, diff)
1399 .expect("could not pack audit log update"),
1400 ]
1401 }
1402 StateUpdateKind::NetworkPolicy(policy) => self
1403 .pack_network_policy_update(&policy.id, diff)
1404 .expect("could not pack audit log update"),
1405 StateUpdateKind::StorageCollectionMetadata(_)
1406 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1407 }
1408 }
1409
1410 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1411 self.entry_by_id
1412 .get_mut(id)
1413 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1414 }
1415
1416 fn get_schema_mut(
1417 &mut self,
1418 database_spec: &ResolvedDatabaseSpecifier,
1419 schema_spec: &SchemaSpecifier,
1420 conn_id: &ConnectionId,
1421 ) -> &mut Schema {
1422 match (database_spec, schema_spec) {
1424 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1425 .temporary_schemas
1426 .get_mut(conn_id)
1427 .expect("catalog out of sync"),
1428 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1429 .ambient_schemas_by_id
1430 .get_mut(id)
1431 .expect("catalog out of sync"),
1432 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1433 .database_by_id
1434 .get_mut(database_id)
1435 .expect("catalog out of sync")
1436 .schemas_by_id
1437 .get_mut(schema_id)
1438 .expect("catalog out of sync"),
1439 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1440 unreachable!("temporary schemas are in the ambient database")
1441 }
1442 }
1443 }
1444
1445 #[instrument(name = "catalog::parse_views")]
1455 async fn parse_builtin_views(
1456 state: &mut CatalogState,
1457 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1458 retractions: &mut InProgressRetractions,
1459 local_expression_cache: &mut LocalExpressionCache,
1460 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1461 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1462 let (updates, additions): (Vec<_>, Vec<_>) =
1463 builtin_views
1464 .into_iter()
1465 .partition_map(|(view, item_id, gid)| {
1466 match retractions.system_object_mappings.remove(&item_id) {
1467 Some(entry) => Either::Left(entry),
1468 None => Either::Right((view, item_id, gid)),
1469 }
1470 });
1471
1472 for entry in updates {
1473 let item_id = entry.id();
1478 state.insert_entry(entry);
1479 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1480 }
1481
1482 let mut handles = Vec::new();
1483 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1484 BTreeMap::new();
1485 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1486 let mut awaiting_all = Vec::new();
1489 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1491 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1492
1493 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1495 .into_iter()
1496 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1497 .collect();
1498 let item_ids: Vec<_> = views.keys().copied().collect();
1499
1500 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1501 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1502 if handles.is_empty() && ready.is_empty() {
1503 ready.extend(awaiting_all.drain(..));
1505 }
1506
1507 if !ready.is_empty() {
1509 let spawn_state = Arc::new(state.clone());
1510 while let Some(id) = ready.pop_front() {
1511 let (view, global_id) = views.get(&id).expect("must exist");
1512 let global_id = *global_id;
1513 let create_sql = view.create_sql();
1514 let versions = BTreeMap::new();
1516
1517 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1518 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1519 let task_state = Arc::clone(&spawn_state);
1520 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1521 let handle = mz_ore::task::spawn_blocking(
1522 || "parse view",
1523 move || {
1524 span.in_scope(|| {
1525 let res = task_state.parse_item_inner(
1526 global_id,
1527 &create_sql,
1528 &versions,
1529 None,
1530 false,
1531 None,
1532 cached_expr,
1533 None,
1534 );
1535 (id, global_id, res)
1536 })
1537 },
1538 );
1539 handles.push(handle);
1540 }
1541 }
1542
1543 let (selected, _idx, remaining) = future::select_all(handles).await;
1545 handles = remaining;
1546 let (id, global_id, res) = selected;
1547 let mut insert_cached_expr = |cached_expr| {
1548 if let Some(cached_expr) = cached_expr {
1549 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1550 }
1551 };
1552 match res {
1553 Ok((item, uncached_expr)) => {
1554 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1555 local_expression_cache.insert_uncached_expression(
1556 global_id,
1557 uncached_expr,
1558 optimizer_features,
1559 );
1560 }
1561 let (view, _gid) = views.remove(&id).expect("must exist");
1563 let schema_id = state
1564 .ambient_schemas_by_name
1565 .get(view.schema)
1566 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1567 let qname = QualifiedItemName {
1568 qualifiers: ItemQualifiers {
1569 database_spec: ResolvedDatabaseSpecifier::Ambient,
1570 schema_spec: SchemaSpecifier::Id(*schema_id),
1571 },
1572 item: view.name.into(),
1573 };
1574 let mut acl_items = vec![rbac::owner_privilege(
1575 mz_sql::catalog::ObjectType::View,
1576 MZ_SYSTEM_ROLE_ID,
1577 )];
1578 acl_items.extend_from_slice(&view.access);
1579
1580 state.insert_item(
1581 id,
1582 view.oid,
1583 qname,
1584 item,
1585 MZ_SYSTEM_ROLE_ID,
1586 PrivilegeMap::from_mz_acl_items(acl_items),
1587 );
1588
1589 let mut resolved_dependent_items = Vec::new();
1591 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1592 resolved_dependent_items.extend(dependent_items);
1593 }
1594 let entry = state.get_entry(&id);
1595 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1596 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1597 resolved_dependent_items.extend(dependent_items);
1598 }
1599 ready.extend(resolved_dependent_items);
1600
1601 completed_ids.insert(id);
1602 completed_names.insert(full_name);
1603 }
1604 Err((
1606 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1607 cached_expr,
1608 )) => {
1609 insert_cached_expr(cached_expr);
1610 if completed_ids.contains(&missing_dep) {
1611 ready.push_back(id);
1612 } else {
1613 awaiting_id_dependencies
1614 .entry(missing_dep)
1615 .or_default()
1616 .push(id);
1617 }
1618 }
1619 Err((
1621 AdapterError::PlanError(plan::PlanError::Catalog(
1622 SqlCatalogError::UnknownItem(missing_dep),
1623 )),
1624 cached_expr,
1625 )) => {
1626 insert_cached_expr(cached_expr);
1627 match CatalogItemId::from_str(&missing_dep) {
1628 Ok(missing_dep) => {
1629 if completed_ids.contains(&missing_dep) {
1630 ready.push_back(id);
1631 } else {
1632 awaiting_id_dependencies
1633 .entry(missing_dep)
1634 .or_default()
1635 .push(id);
1636 }
1637 }
1638 Err(_) => {
1639 if completed_names.contains(&missing_dep) {
1640 ready.push_back(id);
1641 } else {
1642 awaiting_name_dependencies
1643 .entry(missing_dep)
1644 .or_default()
1645 .push(id);
1646 }
1647 }
1648 }
1649 }
1650 Err((
1651 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1652 cached_expr,
1653 )) => {
1654 insert_cached_expr(cached_expr);
1655 awaiting_all.push(id);
1656 }
1657 Err((e, _)) => {
1658 let (bad_view, _gid) = views.get(&id).expect("must exist");
1659 panic!(
1660 "internal error: failed to load bootstrap view:\n\
1661 {name}\n\
1662 error:\n\
1663 {e:?}\n\n\
1664 Make sure that the schema name is specified in the builtin view's create sql statement.
1665 ",
1666 name = bad_view.name,
1667 )
1668 }
1669 }
1670 }
1671
1672 assert!(awaiting_id_dependencies.is_empty());
1673 assert!(
1674 awaiting_name_dependencies.is_empty(),
1675 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1676 );
1677 assert!(awaiting_all.is_empty());
1678 assert!(views.is_empty());
1679
1680 builtin_table_updates.extend(
1682 item_ids
1683 .into_iter()
1684 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1685 );
1686
1687 builtin_table_updates
1688 }
1689
1690 fn insert_entry(&mut self, entry: CatalogEntry) {
1692 if !entry.id.is_system() {
1693 if let Some(cluster_id) = entry.item.cluster_id() {
1694 self.clusters_by_id
1695 .get_mut(&cluster_id)
1696 .expect("catalog out of sync")
1697 .bound_objects
1698 .insert(entry.id);
1699 };
1700 }
1701
1702 for u in entry.references().items() {
1703 match self.entry_by_id.get_mut(u) {
1704 Some(metadata) => metadata.referenced_by.push(entry.id()),
1705 None => panic!(
1706 "Catalog: missing dependent catalog item {} while installing {}",
1707 &u,
1708 self.resolve_full_name(entry.name(), entry.conn_id())
1709 ),
1710 }
1711 }
1712 for u in entry.uses() {
1713 if u == entry.id() {
1716 continue;
1717 }
1718 match self.entry_by_id.get_mut(&u) {
1719 Some(metadata) => metadata.used_by.push(entry.id()),
1720 None => panic!(
1721 "Catalog: missing dependent catalog item {} while installing {}",
1722 &u,
1723 self.resolve_full_name(entry.name(), entry.conn_id())
1724 ),
1725 }
1726 }
1727 for gid in entry.item.global_ids() {
1728 self.entry_by_global_id.insert(gid, entry.id());
1729 }
1730 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1731 if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1734 && !self.temporary_schemas.contains_key(conn_id)
1735 {
1736 self.create_temporary_schema(conn_id, entry.owner_id)
1737 .expect("failed to create temporary schema");
1738 }
1739 let schema = self.get_schema_mut(
1740 &entry.name().qualifiers.database_spec,
1741 &entry.name().qualifiers.schema_spec,
1742 conn_id,
1743 );
1744
1745 let prev_id = match entry.item() {
1746 CatalogItem::Func(_) => schema
1747 .functions
1748 .insert(entry.name().item.clone(), entry.id()),
1749 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1750 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1751 };
1752
1753 assert!(
1754 prev_id.is_none(),
1755 "builtin name collision on {:?}",
1756 entry.name().item.clone()
1757 );
1758
1759 self.entry_by_id.insert(entry.id(), entry.clone());
1760 }
1761
1762 fn insert_item(
1764 &mut self,
1765 id: CatalogItemId,
1766 oid: u32,
1767 name: QualifiedItemName,
1768 item: CatalogItem,
1769 owner_id: RoleId,
1770 privileges: PrivilegeMap,
1771 ) {
1772 let entry = CatalogEntry {
1773 item,
1774 name,
1775 id,
1776 oid,
1777 used_by: Vec::new(),
1778 referenced_by: Vec::new(),
1779 owner_id,
1780 privileges,
1781 };
1782
1783 self.insert_entry(entry);
1784 }
1785
1786 #[mz_ore::instrument(level = "trace")]
1787 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1788 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1789 for u in metadata.references().items() {
1790 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1791 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1792 }
1793 }
1794 for u in metadata.uses() {
1795 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1796 dep_metadata.used_by.retain(|u| *u != metadata.id())
1797 }
1798 }
1799 for gid in metadata.global_ids() {
1800 self.entry_by_global_id.remove(&gid);
1801 }
1802
1803 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1804 let schema = self.get_schema_mut(
1805 &metadata.name().qualifiers.database_spec,
1806 &metadata.name().qualifiers.schema_spec,
1807 conn_id,
1808 );
1809 if metadata.item_type() == CatalogItemType::Type {
1810 schema
1811 .types
1812 .remove(&metadata.name().item)
1813 .expect("catalog out of sync");
1814 } else {
1815 assert_ne!(metadata.item_type(), CatalogItemType::Func);
1818
1819 schema
1820 .items
1821 .remove(&metadata.name().item)
1822 .expect("catalog out of sync");
1823 };
1824
1825 if !id.is_system() {
1826 if let Some(cluster_id) = metadata.item().cluster_id() {
1827 assert!(
1828 self.clusters_by_id
1829 .get_mut(&cluster_id)
1830 .expect("catalog out of sync")
1831 .bound_objects
1832 .remove(&id),
1833 "catalog out of sync"
1834 );
1835 }
1836 }
1837
1838 metadata
1839 }
1840
1841 fn insert_introspection_source_index(
1842 &mut self,
1843 cluster_id: ClusterId,
1844 log: &'static BuiltinLog,
1845 item_id: CatalogItemId,
1846 global_id: GlobalId,
1847 oid: u32,
1848 ) {
1849 let (index_name, index) =
1850 self.create_introspection_source_index(cluster_id, log, global_id);
1851 self.insert_item(
1852 item_id,
1853 oid,
1854 index_name,
1855 index,
1856 MZ_SYSTEM_ROLE_ID,
1857 PrivilegeMap::default(),
1858 );
1859 }
1860
1861 fn create_introspection_source_index(
1862 &self,
1863 cluster_id: ClusterId,
1864 log: &'static BuiltinLog,
1865 global_id: GlobalId,
1866 ) -> (QualifiedItemName, CatalogItem) {
1867 let source_name = FullItemName {
1868 database: RawDatabaseSpecifier::Ambient,
1869 schema: log.schema.into(),
1870 item: log.name.into(),
1871 };
1872 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1873 let mut index_name = QualifiedItemName {
1874 qualifiers: ItemQualifiers {
1875 database_spec: ResolvedDatabaseSpecifier::Ambient,
1876 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1877 },
1878 item: index_name.clone(),
1879 };
1880 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1881 let index_item_name = index_name.item.clone();
1882 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1883 let index = CatalogItem::Index(Index {
1884 global_id,
1885 on: log_global_id,
1886 keys: log
1887 .variant
1888 .index_by()
1889 .into_iter()
1890 .map(MirScalarExpr::column)
1891 .collect(),
1892 create_sql: index_sql(
1893 index_item_name,
1894 cluster_id,
1895 source_name,
1896 &log.variant.desc(),
1897 &log.variant.index_by(),
1898 ),
1899 conn_id: None,
1900 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1901 cluster_id,
1902 is_retained_metrics_object: false,
1903 custom_logical_compaction_window: None,
1904 });
1905 (index_name, index)
1906 }
1907
1908 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1913 Ok(self.system_configuration.set(name, value)?)
1914 }
1915
1916 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1921 Ok(self.system_configuration.reset(name)?)
1922 }
1923}
1924
1925fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1932 fn push_update<T>(
1933 update: T,
1934 diff: StateDiff,
1935 retractions: &mut Vec<T>,
1936 additions: &mut Vec<T>,
1937 ) {
1938 match diff {
1939 StateDiff::Retraction => retractions.push(update),
1940 StateDiff::Addition => additions.push(update),
1941 }
1942 }
1943
1944 soft_assert_no_log!(
1945 updates.iter().map(|update| update.ts).all_equal(),
1946 "all timestamps should be equal: {updates:?}"
1947 );
1948
1949 let mut pre_cluster_retractions = Vec::new();
1951 let mut pre_cluster_additions = Vec::new();
1952 let mut cluster_retractions = Vec::new();
1953 let mut cluster_additions = Vec::new();
1954 let mut builtin_item_updates = Vec::new();
1955 let mut item_retractions = Vec::new();
1956 let mut item_additions = Vec::new();
1957 let mut temp_item_retractions = Vec::new();
1958 let mut temp_item_additions = Vec::new();
1959 let mut post_item_retractions = Vec::new();
1960 let mut post_item_additions = Vec::new();
1961 for update in updates {
1962 let diff = update.diff.clone();
1963 match update.kind {
1964 StateUpdateKind::Role(_)
1965 | StateUpdateKind::RoleAuth(_)
1966 | StateUpdateKind::Database(_)
1967 | StateUpdateKind::Schema(_)
1968 | StateUpdateKind::DefaultPrivilege(_)
1969 | StateUpdateKind::SystemPrivilege(_)
1970 | StateUpdateKind::SystemConfiguration(_)
1971 | StateUpdateKind::NetworkPolicy(_) => push_update(
1972 update,
1973 diff,
1974 &mut pre_cluster_retractions,
1975 &mut pre_cluster_additions,
1976 ),
1977 StateUpdateKind::Cluster(_)
1978 | StateUpdateKind::IntrospectionSourceIndex(_)
1979 | StateUpdateKind::ClusterReplica(_) => push_update(
1980 update,
1981 diff,
1982 &mut cluster_retractions,
1983 &mut cluster_additions,
1984 ),
1985 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1986 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1987 }
1988 StateUpdateKind::TemporaryItem(item) => push_update(
1989 (item, update.ts, update.diff),
1990 diff,
1991 &mut temp_item_retractions,
1992 &mut temp_item_additions,
1993 ),
1994 StateUpdateKind::Item(item) => push_update(
1995 (item, update.ts, update.diff),
1996 diff,
1997 &mut item_retractions,
1998 &mut item_additions,
1999 ),
2000 StateUpdateKind::Comment(_)
2001 | StateUpdateKind::SourceReferences(_)
2002 | StateUpdateKind::AuditLog(_)
2003 | StateUpdateKind::StorageCollectionMetadata(_)
2004 | StateUpdateKind::UnfinalizedShard(_) => push_update(
2005 update,
2006 diff,
2007 &mut post_item_retractions,
2008 &mut post_item_additions,
2009 ),
2010 }
2011 }
2012
2013 let builtin_item_updates = builtin_item_updates
2015 .into_iter()
2016 .map(|(system_object_mapping, ts, diff)| {
2017 let idx = BUILTIN_LOOKUP
2018 .get(&system_object_mapping.description)
2019 .expect("missing builtin")
2020 .0;
2021 (idx, system_object_mapping, ts, diff)
2022 })
2023 .sorted_by_key(|(idx, _, _, _)| *idx)
2024 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2025
2026 let mut other_builtin_retractions = Vec::new();
2028 let mut other_builtin_additions = Vec::new();
2029 let mut builtin_index_retractions = Vec::new();
2030 let mut builtin_index_additions = Vec::new();
2031 for (builtin_item_update, ts, diff) in builtin_item_updates {
2032 match &builtin_item_update.description.object_type {
2033 CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
2034 StateUpdate {
2035 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2036 ts,
2037 diff,
2038 },
2039 diff,
2040 &mut builtin_index_retractions,
2041 &mut builtin_index_additions,
2042 ),
2043 CatalogItemType::Table
2044 | CatalogItemType::Source
2045 | CatalogItemType::Sink
2046 | CatalogItemType::View
2047 | CatalogItemType::MaterializedView
2048 | CatalogItemType::Type
2049 | CatalogItemType::Func
2050 | CatalogItemType::Secret
2051 | CatalogItemType::Connection => push_update(
2052 StateUpdate {
2053 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2054 ts,
2055 diff,
2056 },
2057 diff,
2058 &mut other_builtin_retractions,
2059 &mut other_builtin_additions,
2060 ),
2061 }
2062 }
2063
2064 fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2071 let mut topo: BTreeMap<
2072 (mz_catalog::durable::Item, Timestamp, StateDiff),
2073 BTreeSet<CatalogItemId>,
2074 > = BTreeMap::default();
2075 let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
2076
2077 tracing::debug!(?connections, "sorting connections");
2079 for (connection, ts, diff) in connections.drain(..) {
2080 let statement = mz_sql::parse::parse(&connection.create_sql)
2081 .expect("valid CONNECTION create_sql")
2082 .into_element()
2083 .ast;
2084 let mut dependencies = mz_sql::names::dependencies(&statement)
2085 .expect("failed to find dependencies of CONNECTION");
2086 dependencies.remove(&connection.id);
2088 dependencies.retain(|dep| existing_connections.contains(dep));
2091
2092 assert_none!(topo.insert((connection, ts, diff), dependencies));
2094 }
2095 tracing::debug!(?topo, ?existing_connections, "built topological sort");
2096
2097 while !topo.is_empty() {
2099 let no_deps: Vec<_> = topo
2101 .iter()
2102 .filter_map(|(item, deps)| {
2103 if deps.is_empty() {
2104 Some(item.clone())
2105 } else {
2106 None
2107 }
2108 })
2109 .collect();
2110
2111 if no_deps.is_empty() {
2113 panic!("programming error, cycle in Connections");
2114 }
2115
2116 for item in no_deps {
2118 topo.remove(&item);
2120 topo.values_mut().for_each(|deps| {
2122 deps.remove(&item.0.id);
2123 });
2124 connections.push(item);
2126 }
2127 }
2128 }
2129
2130 fn sort_item_updates(
2146 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2147 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2148 let mut types = Vec::new();
2151 let mut funcs = Vec::new();
2154 let mut secrets = Vec::new();
2155 let mut connections = Vec::new();
2156 let mut sources = Vec::new();
2157 let mut tables = Vec::new();
2158 let mut derived_items = Vec::new();
2159 let mut sinks = Vec::new();
2160 let mut continual_tasks = Vec::new();
2161
2162 for update in item_updates {
2163 match update.0.item_type() {
2164 CatalogItemType::Type => types.push(update),
2165 CatalogItemType::Func => funcs.push(update),
2166 CatalogItemType::Secret => secrets.push(update),
2167 CatalogItemType::Connection => connections.push(update),
2168 CatalogItemType::Source => sources.push(update),
2169 CatalogItemType::Table => tables.push(update),
2170 CatalogItemType::View
2171 | CatalogItemType::MaterializedView
2172 | CatalogItemType::Index => derived_items.push(update),
2173 CatalogItemType::Sink => sinks.push(update),
2174 CatalogItemType::ContinualTask => continual_tasks.push(update),
2175 }
2176 }
2177
2178 for group in [
2180 &mut types,
2181 &mut funcs,
2182 &mut secrets,
2183 &mut sources,
2184 &mut tables,
2185 &mut derived_items,
2186 &mut sinks,
2187 &mut continual_tasks,
2188 ] {
2189 group.sort_by_key(|(item, _, _)| item.id);
2190 }
2191
2192 sort_connections(&mut connections);
2196
2197 iter::empty()
2198 .chain(types)
2199 .chain(funcs)
2200 .chain(secrets)
2201 .chain(connections)
2202 .chain(sources)
2203 .chain(tables)
2204 .chain(derived_items)
2205 .chain(sinks)
2206 .chain(continual_tasks)
2207 .collect()
2208 }
2209
2210 let item_retractions = sort_item_updates(item_retractions);
2211 let item_additions = sort_item_updates(item_additions);
2212
2213 fn sort_temp_item_updates(
2217 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2218 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2219 let mut types = Vec::new();
2222 let mut funcs = Vec::new();
2224 let mut secrets = Vec::new();
2225 let mut connections = Vec::new();
2226 let mut sources = Vec::new();
2227 let mut tables = Vec::new();
2228 let mut derived_items = Vec::new();
2229 let mut sinks = Vec::new();
2230 let mut continual_tasks = Vec::new();
2231
2232 for update in temp_item_updates {
2233 match update.0.item_type() {
2234 CatalogItemType::Type => types.push(update),
2235 CatalogItemType::Func => funcs.push(update),
2236 CatalogItemType::Secret => secrets.push(update),
2237 CatalogItemType::Connection => connections.push(update),
2238 CatalogItemType::Source => sources.push(update),
2239 CatalogItemType::Table => tables.push(update),
2240 CatalogItemType::View
2241 | CatalogItemType::MaterializedView
2242 | CatalogItemType::Index => derived_items.push(update),
2243 CatalogItemType::Sink => sinks.push(update),
2244 CatalogItemType::ContinualTask => continual_tasks.push(update),
2245 }
2246 }
2247
2248 for group in [
2250 &mut types,
2251 &mut funcs,
2252 &mut secrets,
2253 &mut connections,
2254 &mut sources,
2255 &mut tables,
2256 &mut derived_items,
2257 &mut sinks,
2258 &mut continual_tasks,
2259 ] {
2260 group.sort_by_key(|(item, _, _)| item.id);
2261 }
2262
2263 iter::empty()
2264 .chain(types)
2265 .chain(funcs)
2266 .chain(secrets)
2267 .chain(connections)
2268 .chain(sources)
2269 .chain(tables)
2270 .chain(derived_items)
2271 .chain(sinks)
2272 .chain(continual_tasks)
2273 .collect()
2274 }
2275 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2276 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2277
2278 fn merge_item_updates(
2280 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2281 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2282 ) -> Vec<StateUpdate> {
2283 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2284
2285 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2286 (item_updates.front(), temp_item_updates.front())
2287 {
2288 if item.id < temp_item.id {
2289 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2290 state_updates.push(StateUpdate {
2291 kind: StateUpdateKind::Item(item),
2292 ts,
2293 diff,
2294 });
2295 } else if item.id > temp_item.id {
2296 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2297 state_updates.push(StateUpdate {
2298 kind: StateUpdateKind::TemporaryItem(temp_item),
2299 ts,
2300 diff,
2301 });
2302 } else {
2303 unreachable!(
2304 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2305 );
2306 }
2307 }
2308
2309 while let Some((item, ts, diff)) = item_updates.pop_front() {
2310 state_updates.push(StateUpdate {
2311 kind: StateUpdateKind::Item(item),
2312 ts,
2313 diff,
2314 });
2315 }
2316
2317 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2318 state_updates.push(StateUpdate {
2319 kind: StateUpdateKind::TemporaryItem(temp_item),
2320 ts,
2321 diff,
2322 });
2323 }
2324
2325 state_updates
2326 }
2327 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2328 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2329
2330 iter::empty()
2332 .chain(post_item_retractions.into_iter().rev())
2334 .chain(item_retractions.into_iter().rev())
2335 .chain(builtin_index_retractions.into_iter().rev())
2336 .chain(cluster_retractions.into_iter().rev())
2337 .chain(other_builtin_retractions.into_iter().rev())
2338 .chain(pre_cluster_retractions.into_iter().rev())
2339 .chain(pre_cluster_additions)
2340 .chain(other_builtin_additions)
2341 .chain(cluster_additions)
2342 .chain(builtin_index_additions)
2343 .chain(item_additions)
2344 .chain(post_item_additions)
2345 .collect()
2346}
2347
2348enum ApplyState {
2353 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2355 Items(Vec<StateUpdate>),
2361 Updates(Vec<StateUpdate>),
2363}
2364
2365impl ApplyState {
2366 fn new(update: StateUpdate) -> Self {
2367 use StateUpdateKind::*;
2368 match &update.kind {
2369 SystemObjectMapping(som)
2370 if som.description.object_type == CatalogItemType::View
2371 && update.diff == StateDiff::Addition =>
2372 {
2373 let view_addition = lookup_builtin_view_addition(som.clone());
2374 Self::BuiltinViewAdditions(vec![view_addition])
2375 }
2376
2377 IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2378 Self::Items(vec![update])
2379 }
2380
2381 Role(_)
2382 | RoleAuth(_)
2383 | Database(_)
2384 | Schema(_)
2385 | DefaultPrivilege(_)
2386 | SystemPrivilege(_)
2387 | SystemConfiguration(_)
2388 | Cluster(_)
2389 | NetworkPolicy(_)
2390 | ClusterReplica(_)
2391 | SourceReferences(_)
2392 | Comment(_)
2393 | AuditLog(_)
2394 | StorageCollectionMetadata(_)
2395 | UnfinalizedShard(_) => Self::Updates(vec![update]),
2396 }
2397 }
2398
2399 async fn apply(
2405 self,
2406 state: &mut CatalogState,
2407 retractions: &mut InProgressRetractions,
2408 local_expression_cache: &mut LocalExpressionCache,
2409 ) -> (
2410 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2411 Vec<ParsedStateUpdate>,
2412 ) {
2413 match self {
2414 Self::BuiltinViewAdditions(builtin_view_additions) => {
2415 let restore = state.system_configuration.clone();
2416 state.system_configuration.enable_for_item_parsing();
2417 let builtin_table_updates = CatalogState::parse_builtin_views(
2418 state,
2419 builtin_view_additions,
2420 retractions,
2421 local_expression_cache,
2422 )
2423 .await;
2424 state.system_configuration = restore;
2425 (builtin_table_updates, Vec::new())
2426 }
2427 Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2428 state
2429 .apply_updates_inner(updates, retractions, local_expression_cache)
2430 .expect("corrupt catalog")
2431 }),
2432 Self::Updates(updates) => state
2433 .apply_updates_inner(updates, retractions, local_expression_cache)
2434 .expect("corrupt catalog"),
2435 }
2436 }
2437
2438 async fn step(
2439 self,
2440 next: Self,
2441 state: &mut CatalogState,
2442 retractions: &mut InProgressRetractions,
2443 local_expression_cache: &mut LocalExpressionCache,
2444 ) -> (
2445 Self,
2446 (
2447 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2448 Vec<ParsedStateUpdate>,
2449 ),
2450 ) {
2451 match (self, next) {
2452 (
2453 Self::BuiltinViewAdditions(mut builtin_view_additions),
2454 Self::BuiltinViewAdditions(next_builtin_view_additions),
2455 ) => {
2456 builtin_view_additions.extend(next_builtin_view_additions);
2458 (
2459 Self::BuiltinViewAdditions(builtin_view_additions),
2460 (Vec::new(), Vec::new()),
2461 )
2462 }
2463 (Self::Items(mut updates), Self::Items(next_updates)) => {
2464 updates.extend(next_updates);
2466 (Self::Items(updates), (Vec::new(), Vec::new()))
2467 }
2468 (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2469 updates.extend(next_updates);
2471 (Self::Updates(updates), (Vec::new(), Vec::new()))
2472 }
2473 (apply_state, next_apply_state) => {
2474 let updates = apply_state
2476 .apply(state, retractions, local_expression_cache)
2477 .await;
2478 (next_apply_state, updates)
2479 }
2480 }
2481 }
2482}
2483
2484fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2489where
2490 K: Ord + Clone + Debug,
2491 V: PartialEq + Debug,
2492{
2493 match diff {
2494 StateDiff::Retraction => {
2495 let prev = map.remove(key);
2496 assert_eq!(
2497 prev,
2498 Some(value),
2499 "retraction does not match existing value: {key:?}"
2500 );
2501 }
2502 StateDiff::Addition => {
2503 let prev = map.insert(key.clone(), value);
2504 assert_eq!(
2505 prev, None,
2506 "values must be explicitly retracted before inserting a new value: {key:?}"
2507 );
2508 }
2509 }
2510}
2511
2512fn apply_with_update<K, V, D>(
2515 map: &mut BTreeMap<K, V>,
2516 durable: D,
2517 key_fn: impl FnOnce(&D) -> K,
2518 diff: StateDiff,
2519 retractions: &mut BTreeMap<D::Key, V>,
2520) where
2521 K: Ord,
2522 V: UpdateFrom<D> + PartialEq + Debug,
2523 D: DurableType,
2524 D::Key: Ord,
2525{
2526 match diff {
2527 StateDiff::Retraction => {
2528 let mem_key = key_fn(&durable);
2529 let value = map
2530 .remove(&mem_key)
2531 .expect("retraction does not match existing value: {key:?}");
2532 let durable_key = durable.into_key_value().0;
2533 retractions.insert(durable_key, value);
2534 }
2535 StateDiff::Addition => {
2536 let mem_key = key_fn(&durable);
2537 let durable_key = durable.key();
2538 let value = match retractions.remove(&durable_key) {
2539 Some(mut retraction) => {
2540 retraction.update_from(durable);
2541 retraction
2542 }
2543 None => durable.into(),
2544 };
2545 let prev = map.insert(mem_key, value);
2546 assert_eq!(
2547 prev, None,
2548 "values must be explicitly retracted before inserting a new value"
2549 );
2550 }
2551 }
2552}
2553
2554fn lookup_builtin_view_addition(
2556 mapping: SystemObjectMapping,
2557) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2558 let (_, builtin) = BUILTIN_LOOKUP
2559 .get(&mapping.description)
2560 .expect("missing builtin view");
2561 let Builtin::View(view) = builtin else {
2562 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2563 };
2564
2565 (
2566 view,
2567 mapping.unique_identifier.catalog_id,
2568 mapping.unique_identifier.global_id,
2569 )
2570}