1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::future;
20use itertools::{Either, Itertools};
21use mz_adapter_types::connection::ConnectionId;
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
25};
26use mz_catalog::durable::objects::{
27 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
28 SchemaKey,
29};
30use mz_catalog::durable::{CatalogError, SystemObjectMapping};
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
34 NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
35 TableDataSource, TemporaryItem, Type, UpdateFrom,
36};
37use mz_compute_types::config::ComputeReplicaConfig;
38use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
39use mz_controller_types::ClusterId;
40use mz_expr::MirScalarExpr;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_ore::{instrument, soft_assert_no_log};
43use mz_pgrepr::oid::INVALID_OID;
44use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
47use mz_sql::catalog::CatalogError as SqlCatalogError;
48use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
49use mz_sql::names::{
50 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
51 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
52};
53use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
54use mz_sql::session::vars::{VarError, VarInput};
55use mz_sql::{plan, rbac};
56use mz_sql_parser::ast::Expr;
57use mz_storage_types::sources::Timeline;
58use tracing::{Instrument, info_span, warn};
59
60use crate::AdapterError;
61use crate::catalog::state::LocalExpressionCache;
62use crate::catalog::{BuiltinTableUpdate, CatalogState};
63use crate::util::index_sql;
64
65#[derive(Debug, Clone, Default)]
77struct InProgressRetractions {
78 roles: BTreeMap<RoleKey, Role>,
79 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
80 databases: BTreeMap<DatabaseKey, Database>,
81 schemas: BTreeMap<SchemaKey, Schema>,
82 clusters: BTreeMap<ClusterKey, Cluster>,
83 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
84 items: BTreeMap<ItemKey, CatalogEntry>,
85 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
86 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
87 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
88}
89
90impl CatalogState {
91 #[must_use]
98 #[instrument]
99 pub(crate) async fn apply_updates_for_bootstrap(
100 &mut self,
101 updates: Vec<StateUpdate>,
102 local_expression_cache: &mut LocalExpressionCache,
103 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
104 let mut builtin_table_updates = Vec::with_capacity(updates.len());
105 let updates = sort_updates(updates);
106
107 let mut groups: Vec<Vec<_>> = Vec::new();
108 for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
109 groups.push(updates.collect());
110 }
111 for updates in groups {
112 let mut apply_state = BootstrapApplyState::Updates(Vec::new());
113 let mut retractions = InProgressRetractions::default();
114
115 for update in updates {
116 let next_apply_state = BootstrapApplyState::new(update);
117 let (next_apply_state, builtin_table_update) = apply_state
118 .step(
119 next_apply_state,
120 self,
121 &mut retractions,
122 local_expression_cache,
123 )
124 .await;
125 apply_state = next_apply_state;
126 builtin_table_updates.extend(builtin_table_update);
127 }
128
129 let builtin_table_update = apply_state
131 .apply(self, &mut retractions, local_expression_cache)
132 .await;
133 builtin_table_updates.extend(builtin_table_update);
134 }
135 builtin_table_updates
136 }
137
138 #[instrument]
142 pub(crate) fn apply_updates(
143 &mut self,
144 updates: Vec<StateUpdate>,
145 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
146 let mut builtin_table_updates = Vec::with_capacity(updates.len());
147 let updates = sort_updates(updates);
148
149 for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
150 let mut retractions = InProgressRetractions::default();
151 let builtin_table_update = self.apply_updates_inner(
152 updates.collect(),
153 &mut retractions,
154 &mut LocalExpressionCache::Closed,
155 )?;
156 builtin_table_updates.extend(builtin_table_update);
157 }
158
159 Ok(builtin_table_updates)
160 }
161
162 #[instrument(level = "debug")]
163 fn apply_updates_inner(
164 &mut self,
165 updates: Vec<StateUpdate>,
166 retractions: &mut InProgressRetractions,
167 local_expression_cache: &mut LocalExpressionCache,
168 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
169 soft_assert_no_log!(
170 updates.iter().map(|update| update.ts).all_equal(),
171 "all timestamps should be equal: {updates:?}"
172 );
173
174 let mut update_system_config = false;
175
176 let mut builtin_table_updates = Vec::with_capacity(updates.len());
177 for StateUpdate { kind, ts: _, diff } in updates {
178 if matches!(kind, StateUpdateKind::SystemConfiguration(_)) {
179 update_system_config = true;
180 }
181
182 match diff {
183 StateDiff::Retraction => {
184 builtin_table_updates
187 .extend(self.generate_builtin_table_update(kind.clone(), diff));
188 self.apply_update(kind, diff, retractions, local_expression_cache)?;
189 }
190 StateDiff::Addition => {
191 self.apply_update(kind.clone(), diff, retractions, local_expression_cache)?;
192 builtin_table_updates
195 .extend(self.generate_builtin_table_update(kind.clone(), diff));
196 }
197 }
198 }
199
200 if update_system_config {
201 self.system_configuration.dyncfg_updates();
202 }
203
204 Ok(builtin_table_updates)
205 }
206
207 #[instrument(level = "debug")]
208 fn apply_update(
209 &mut self,
210 kind: StateUpdateKind,
211 diff: StateDiff,
212 retractions: &mut InProgressRetractions,
213 local_expression_cache: &mut LocalExpressionCache,
214 ) -> Result<(), CatalogError> {
215 match kind {
216 StateUpdateKind::Role(role) => {
217 self.apply_role_update(role, diff, retractions);
218 }
219 StateUpdateKind::RoleAuth(role_auth) => {
220 self.apply_role_auth_update(role_auth, diff, retractions);
221 }
222 StateUpdateKind::Database(database) => {
223 self.apply_database_update(database, diff, retractions);
224 }
225 StateUpdateKind::Schema(schema) => {
226 self.apply_schema_update(schema, diff, retractions);
227 }
228 StateUpdateKind::DefaultPrivilege(default_privilege) => {
229 self.apply_default_privilege_update(default_privilege, diff, retractions);
230 }
231 StateUpdateKind::SystemPrivilege(system_privilege) => {
232 self.apply_system_privilege_update(system_privilege, diff, retractions);
233 }
234 StateUpdateKind::SystemConfiguration(system_configuration) => {
235 self.apply_system_configuration_update(system_configuration, diff, retractions);
236 }
237 StateUpdateKind::Cluster(cluster) => {
238 self.apply_cluster_update(cluster, diff, retractions);
239 }
240 StateUpdateKind::NetworkPolicy(network_policy) => {
241 self.apply_network_policy_update(network_policy, diff, retractions);
242 }
243 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
244 self.apply_introspection_source_index_update(
245 introspection_source_index,
246 diff,
247 retractions,
248 );
249 }
250 StateUpdateKind::ClusterReplica(cluster_replica) => {
251 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
252 }
253 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
254 self.apply_system_object_mapping_update(
255 system_object_mapping,
256 diff,
257 retractions,
258 local_expression_cache,
259 );
260 }
261 StateUpdateKind::TemporaryItem(item) => {
262 self.apply_temporary_item_update(item, diff, retractions);
263 }
264 StateUpdateKind::Item(item) => {
265 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
266 }
267 StateUpdateKind::Comment(comment) => {
268 self.apply_comment_update(comment, diff, retractions);
269 }
270 StateUpdateKind::SourceReferences(source_reference) => {
271 self.apply_source_references_update(source_reference, diff, retractions);
272 }
273 StateUpdateKind::AuditLog(_audit_log) => {
274 }
276 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
277 self.apply_storage_collection_metadata_update(
278 storage_collection_metadata,
279 diff,
280 retractions,
281 );
282 }
283 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
284 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
285 }
286 }
287
288 Ok(())
289 }
290
291 #[instrument(level = "debug")]
292 fn apply_role_auth_update(
293 &mut self,
294 role_auth: mz_catalog::durable::RoleAuth,
295 diff: StateDiff,
296 retractions: &mut InProgressRetractions,
297 ) {
298 apply_with_update(
299 &mut self.role_auth_by_id,
300 role_auth,
301 |role_auth| role_auth.role_id,
302 diff,
303 &mut retractions.role_auths,
304 );
305 }
306
307 #[instrument(level = "debug")]
308 fn apply_role_update(
309 &mut self,
310 role: mz_catalog::durable::Role,
311 diff: StateDiff,
312 retractions: &mut InProgressRetractions,
313 ) {
314 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
315 apply_with_update(
316 &mut self.roles_by_id,
317 role,
318 |role| role.id,
319 diff,
320 &mut retractions.roles,
321 );
322 }
323
324 #[instrument(level = "debug")]
325 fn apply_database_update(
326 &mut self,
327 database: mz_catalog::durable::Database,
328 diff: StateDiff,
329 retractions: &mut InProgressRetractions,
330 ) {
331 apply_inverted_lookup(
332 &mut self.database_by_name,
333 &database.name,
334 database.id,
335 diff,
336 );
337 apply_with_update(
338 &mut self.database_by_id,
339 database,
340 |database| database.id,
341 diff,
342 &mut retractions.databases,
343 );
344 }
345
346 #[instrument(level = "debug")]
347 fn apply_schema_update(
348 &mut self,
349 schema: mz_catalog::durable::Schema,
350 diff: StateDiff,
351 retractions: &mut InProgressRetractions,
352 ) {
353 let (schemas_by_id, schemas_by_name) = match &schema.database_id {
354 Some(database_id) => {
355 let db = self
356 .database_by_id
357 .get_mut(database_id)
358 .expect("catalog out of sync");
359 (&mut db.schemas_by_id, &mut db.schemas_by_name)
360 }
361 None => (
362 &mut self.ambient_schemas_by_id,
363 &mut self.ambient_schemas_by_name,
364 ),
365 };
366 apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
367 apply_with_update(
368 schemas_by_id,
369 schema,
370 |schema| schema.id,
371 diff,
372 &mut retractions.schemas,
373 );
374 }
375
376 #[instrument(level = "debug")]
377 fn apply_default_privilege_update(
378 &mut self,
379 default_privilege: mz_catalog::durable::DefaultPrivilege,
380 diff: StateDiff,
381 _retractions: &mut InProgressRetractions,
382 ) {
383 match diff {
384 StateDiff::Addition => self
385 .default_privileges
386 .grant(default_privilege.object, default_privilege.acl_item),
387 StateDiff::Retraction => self
388 .default_privileges
389 .revoke(&default_privilege.object, &default_privilege.acl_item),
390 }
391 }
392
393 #[instrument(level = "debug")]
394 fn apply_system_privilege_update(
395 &mut self,
396 system_privilege: MzAclItem,
397 diff: StateDiff,
398 _retractions: &mut InProgressRetractions,
399 ) {
400 match diff {
401 StateDiff::Addition => self.system_privileges.grant(system_privilege),
402 StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
403 }
404 }
405
406 #[instrument(level = "debug")]
407 fn apply_system_configuration_update(
408 &mut self,
409 system_configuration: mz_catalog::durable::SystemConfiguration,
410 diff: StateDiff,
411 _retractions: &mut InProgressRetractions,
412 ) {
413 let res = match diff {
414 StateDiff::Addition => self.insert_system_configuration(
415 &system_configuration.name,
416 VarInput::Flat(&system_configuration.value),
417 ),
418 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
419 };
420 match res {
421 Ok(_) => (),
422 Err(Error {
426 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
427 }) => {
428 warn!(%name, "unknown system parameter from catalog storage");
429 }
430 Err(e) => panic!("unable to update system variable: {e:?}"),
431 }
432 }
433
434 #[instrument(level = "debug")]
435 fn apply_cluster_update(
436 &mut self,
437 cluster: mz_catalog::durable::Cluster,
438 diff: StateDiff,
439 retractions: &mut InProgressRetractions,
440 ) {
441 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
442 apply_with_update(
443 &mut self.clusters_by_id,
444 cluster,
445 |cluster| cluster.id,
446 diff,
447 &mut retractions.clusters,
448 );
449 }
450
451 #[instrument(level = "debug")]
452 fn apply_network_policy_update(
453 &mut self,
454 policy: mz_catalog::durable::NetworkPolicy,
455 diff: StateDiff,
456 retractions: &mut InProgressRetractions,
457 ) {
458 apply_inverted_lookup(
459 &mut self.network_policies_by_name,
460 &policy.name,
461 policy.id,
462 diff,
463 );
464 apply_with_update(
465 &mut self.network_policies_by_id,
466 policy,
467 |policy| policy.id,
468 diff,
469 &mut retractions.network_policies,
470 );
471 }
472
473 #[instrument(level = "debug")]
474 fn apply_introspection_source_index_update(
475 &mut self,
476 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
477 diff: StateDiff,
478 retractions: &mut InProgressRetractions,
479 ) {
480 let cluster = self
481 .clusters_by_id
482 .get_mut(&introspection_source_index.cluster_id)
483 .expect("catalog out of sync");
484 let log = BUILTIN_LOG_LOOKUP
485 .get(introspection_source_index.name.as_str())
486 .expect("missing log");
487 apply_inverted_lookup(
488 &mut cluster.log_indexes,
489 &log.variant,
490 introspection_source_index.index_id,
491 diff,
492 );
493
494 match diff {
495 StateDiff::Addition => {
496 if let Some(mut entry) = retractions
497 .introspection_source_indexes
498 .remove(&introspection_source_index.item_id)
499 {
500 let (index_name, index) = self.create_introspection_source_index(
503 introspection_source_index.cluster_id,
504 log,
505 introspection_source_index.index_id,
506 );
507 assert_eq!(entry.id, introspection_source_index.item_id);
508 assert_eq!(entry.oid, introspection_source_index.oid);
509 assert_eq!(entry.name, index_name);
510 entry.item = index;
511 self.insert_entry(entry);
512 } else {
513 self.insert_introspection_source_index(
514 introspection_source_index.cluster_id,
515 log,
516 introspection_source_index.item_id,
517 introspection_source_index.index_id,
518 introspection_source_index.oid,
519 );
520 }
521 }
522 StateDiff::Retraction => {
523 let entry = self.drop_item(introspection_source_index.item_id);
524 retractions
525 .introspection_source_indexes
526 .insert(entry.id, entry);
527 }
528 }
529 }
530
531 #[instrument(level = "debug")]
532 fn apply_cluster_replica_update(
533 &mut self,
534 cluster_replica: mz_catalog::durable::ClusterReplica,
535 diff: StateDiff,
536 _retractions: &mut InProgressRetractions,
537 ) {
538 let cluster = self
539 .clusters_by_id
540 .get(&cluster_replica.cluster_id)
541 .expect("catalog out of sync");
542 let azs = cluster.availability_zones();
543 let location = self
544 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
545 .expect("catalog in unexpected state");
546 let cluster = self
547 .clusters_by_id
548 .get_mut(&cluster_replica.cluster_id)
549 .expect("catalog out of sync");
550 apply_inverted_lookup(
551 &mut cluster.replica_id_by_name_,
552 &cluster_replica.name,
553 cluster_replica.replica_id,
554 diff,
555 );
556 match diff {
557 StateDiff::Retraction => {
558 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
559 assert!(
560 prev.is_some(),
561 "retraction does not match existing value: {:?}",
562 cluster_replica.replica_id
563 );
564 }
565 StateDiff::Addition => {
566 let logging = ReplicaLogging {
567 log_logging: cluster_replica.config.logging.log_logging,
568 interval: cluster_replica.config.logging.interval,
569 };
570 let config = ReplicaConfig {
571 location,
572 compute: ComputeReplicaConfig { logging },
573 };
574 let mem_cluster_replica = ClusterReplica {
575 name: cluster_replica.name.clone(),
576 cluster_id: cluster_replica.cluster_id,
577 replica_id: cluster_replica.replica_id,
578 config,
579 owner_id: cluster_replica.owner_id,
580 };
581 let prev = cluster
582 .replicas_by_id_
583 .insert(cluster_replica.replica_id, mem_cluster_replica);
584 assert_eq!(
585 prev, None,
586 "values must be explicitly retracted before inserting a new value: {:?}",
587 cluster_replica.replica_id
588 );
589 }
590 }
591 }
592
593 #[instrument(level = "debug")]
594 fn apply_system_object_mapping_update(
595 &mut self,
596 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
597 diff: StateDiff,
598 retractions: &mut InProgressRetractions,
599 local_expression_cache: &mut LocalExpressionCache,
600 ) {
601 let item_id = system_object_mapping.unique_identifier.catalog_id;
602 let global_id = system_object_mapping.unique_identifier.global_id;
603
604 if system_object_mapping.unique_identifier.runtime_alterable() {
605 return;
609 }
610
611 if let StateDiff::Retraction = diff {
612 let entry = self.drop_item(item_id);
613 retractions.system_object_mappings.insert(item_id, entry);
614 return;
615 }
616
617 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
618 self.insert_entry(entry);
623 return;
624 }
625
626 let builtin = BUILTIN_LOOKUP
627 .get(&system_object_mapping.description)
628 .expect("missing builtin")
629 .1;
630 let schema_name = builtin.schema();
631 let schema_id = self
632 .ambient_schemas_by_name
633 .get(schema_name)
634 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
635 let name = QualifiedItemName {
636 qualifiers: ItemQualifiers {
637 database_spec: ResolvedDatabaseSpecifier::Ambient,
638 schema_spec: SchemaSpecifier::Id(*schema_id),
639 },
640 item: builtin.name().into(),
641 };
642 match builtin {
643 Builtin::Log(log) => {
644 let mut acl_items = vec![rbac::owner_privilege(
645 mz_sql::catalog::ObjectType::Source,
646 MZ_SYSTEM_ROLE_ID,
647 )];
648 acl_items.extend_from_slice(&log.access);
649 self.insert_item(
650 item_id,
651 log.oid,
652 name.clone(),
653 CatalogItem::Log(Log {
654 variant: log.variant,
655 global_id,
656 }),
657 MZ_SYSTEM_ROLE_ID,
658 PrivilegeMap::from_mz_acl_items(acl_items),
659 );
660 }
661
662 Builtin::Table(table) => {
663 let mut acl_items = vec![rbac::owner_privilege(
664 mz_sql::catalog::ObjectType::Table,
665 MZ_SYSTEM_ROLE_ID,
666 )];
667 acl_items.extend_from_slice(&table.access);
668
669 self.insert_item(
670 item_id,
671 table.oid,
672 name.clone(),
673 CatalogItem::Table(Table {
674 create_sql: None,
675 desc: VersionedRelationDesc::new(table.desc.clone()),
676 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
677 conn_id: None,
678 resolved_ids: ResolvedIds::empty(),
679 custom_logical_compaction_window: table.is_retained_metrics_object.then(
680 || {
681 self.system_config()
682 .metrics_retention()
683 .try_into()
684 .expect("invalid metrics retention")
685 },
686 ),
687 is_retained_metrics_object: table.is_retained_metrics_object,
688 data_source: TableDataSource::TableWrites {
689 defaults: vec![Expr::null(); table.desc.arity()],
690 },
691 }),
692 MZ_SYSTEM_ROLE_ID,
693 PrivilegeMap::from_mz_acl_items(acl_items),
694 );
695 }
696 Builtin::Index(index) => {
697 let custom_logical_compaction_window =
698 index.is_retained_metrics_object.then(|| {
699 self.system_config()
700 .metrics_retention()
701 .try_into()
702 .expect("invalid metrics retention")
703 });
704 let versions = BTreeMap::new();
706
707 let item = self
708 .parse_item(
709 global_id,
710 &index.create_sql(),
711 &versions,
712 None,
713 index.is_retained_metrics_object,
714 custom_logical_compaction_window,
715 local_expression_cache,
716 None,
717 )
718 .unwrap_or_else(|e| {
719 panic!(
720 "internal error: failed to load bootstrap index:\n\
721 {}\n\
722 error:\n\
723 {:?}\n\n\
724 make sure that the schema name is specified in the builtin index's create sql statement.",
725 index.name, e
726 )
727 });
728 let CatalogItem::Index(_) = item else {
729 panic!(
730 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
731 index.name
732 );
733 };
734
735 self.insert_item(
736 item_id,
737 index.oid,
738 name,
739 item,
740 MZ_SYSTEM_ROLE_ID,
741 PrivilegeMap::default(),
742 );
743 }
744 Builtin::View(_) => {
745 unreachable!("views added elsewhere");
747 }
748
749 Builtin::Type(typ) => {
751 let typ = self.resolve_builtin_type_references(typ);
752 if let CatalogType::Array { element_reference } = typ.details.typ {
753 let entry = self.get_entry_mut(&element_reference);
754 let item_type = match &mut entry.item {
755 CatalogItem::Type(item_type) => item_type,
756 _ => unreachable!("types can only reference other types"),
757 };
758 item_type.details.array_id = Some(item_id);
759 }
760
761 let desc = None;
765 assert!(!matches!(typ.details.typ, CatalogType::Record { .. }));
766 let schema_id = self.resolve_system_schema(typ.schema);
767
768 self.insert_item(
769 item_id,
770 typ.oid,
771 QualifiedItemName {
772 qualifiers: ItemQualifiers {
773 database_spec: ResolvedDatabaseSpecifier::Ambient,
774 schema_spec: SchemaSpecifier::Id(schema_id),
775 },
776 item: typ.name.to_owned(),
777 },
778 CatalogItem::Type(Type {
779 create_sql: None,
780 global_id,
781 details: typ.details.clone(),
782 desc,
783 resolved_ids: ResolvedIds::empty(),
784 }),
785 MZ_SYSTEM_ROLE_ID,
786 PrivilegeMap::from_mz_acl_items(vec![
787 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
788 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
789 ]),
790 );
791 }
792
793 Builtin::Func(func) => {
794 let oid = INVALID_OID;
798 self.insert_item(
799 item_id,
800 oid,
801 name.clone(),
802 CatalogItem::Func(Func {
803 inner: func.inner,
804 global_id,
805 }),
806 MZ_SYSTEM_ROLE_ID,
807 PrivilegeMap::default(),
808 );
809 }
810
811 Builtin::Source(coll) => {
812 let mut acl_items = vec![rbac::owner_privilege(
813 mz_sql::catalog::ObjectType::Source,
814 MZ_SYSTEM_ROLE_ID,
815 )];
816 acl_items.extend_from_slice(&coll.access);
817
818 self.insert_item(
819 item_id,
820 coll.oid,
821 name.clone(),
822 CatalogItem::Source(Source {
823 create_sql: None,
824 data_source: DataSourceDesc::Introspection(coll.data_source),
825 desc: coll.desc.clone(),
826 global_id,
827 timeline: Timeline::EpochMilliseconds,
828 resolved_ids: ResolvedIds::empty(),
829 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
830 || {
831 self.system_config()
832 .metrics_retention()
833 .try_into()
834 .expect("invalid metrics retention")
835 },
836 ),
837 is_retained_metrics_object: coll.is_retained_metrics_object,
838 }),
839 MZ_SYSTEM_ROLE_ID,
840 PrivilegeMap::from_mz_acl_items(acl_items),
841 );
842 }
843 Builtin::ContinualTask(ct) => {
844 let mut acl_items = vec![rbac::owner_privilege(
845 mz_sql::catalog::ObjectType::Source,
846 MZ_SYSTEM_ROLE_ID,
847 )];
848 acl_items.extend_from_slice(&ct.access);
849 let versions = BTreeMap::new();
851
852 let item = self
853 .parse_item(
854 global_id,
855 &ct.create_sql(),
856 &versions,
857 None,
858 false,
859 None,
860 local_expression_cache,
861 None,
862 )
863 .unwrap_or_else(|e| {
864 panic!(
865 "internal error: failed to load bootstrap continual task:\n\
866 {}\n\
867 error:\n\
868 {:?}\n\n\
869 make sure that the schema name is specified in the builtin continual task's create sql statement.",
870 ct.name, e
871 )
872 });
873 let CatalogItem::ContinualTask(_) = &item else {
874 panic!(
875 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
876 ct.name
877 );
878 };
879
880 self.insert_item(
881 item_id,
882 ct.oid,
883 name,
884 item,
885 MZ_SYSTEM_ROLE_ID,
886 PrivilegeMap::from_mz_acl_items(acl_items),
887 );
888 }
889 Builtin::Connection(connection) => {
890 let versions = BTreeMap::new();
892 let mut item = self
893 .parse_item(
894 global_id,
895 connection.sql,
896 &versions,
897 None,
898 false,
899 None,
900 local_expression_cache,
901 None,
902 )
903 .unwrap_or_else(|e| {
904 panic!(
905 "internal error: failed to load bootstrap connection:\n\
906 {}\n\
907 error:\n\
908 {:?}\n\n\
909 make sure that the schema name is specified in the builtin connection's create sql statement.",
910 connection.name, e
911 )
912 });
913 let CatalogItem::Connection(_) = &mut item else {
914 panic!(
915 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
916 connection.name
917 );
918 };
919
920 let mut acl_items = vec![rbac::owner_privilege(
921 mz_sql::catalog::ObjectType::Connection,
922 connection.owner_id.clone(),
923 )];
924 acl_items.extend_from_slice(connection.access);
925
926 self.insert_item(
927 item_id,
928 connection.oid,
929 name.clone(),
930 item,
931 connection.owner_id.clone(),
932 PrivilegeMap::from_mz_acl_items(acl_items),
933 );
934 }
935 }
936 }
937
938 #[instrument(level = "debug")]
939 fn apply_temporary_item_update(
940 &mut self,
941 TemporaryItem {
942 id,
943 oid,
944 name,
945 item,
946 owner_id,
947 privileges,
948 }: TemporaryItem,
949 diff: StateDiff,
950 retractions: &mut InProgressRetractions,
951 ) {
952 match diff {
953 StateDiff::Addition => {
954 let entry = match retractions.temp_items.remove(&id) {
955 Some(mut retraction) => {
956 assert_eq!(retraction.id, id);
957 retraction.item = item;
958 retraction.id = id;
959 retraction.oid = oid;
960 retraction.name = name;
961 retraction.owner_id = owner_id;
962 retraction.privileges = privileges;
963 retraction
964 }
965 None => CatalogEntry {
966 item,
967 referenced_by: Vec::new(),
968 used_by: Vec::new(),
969 id,
970 oid,
971 name,
972 owner_id,
973 privileges,
974 },
975 };
976 self.insert_entry(entry);
977 }
978 StateDiff::Retraction => {
979 let entry = self.drop_item(id);
980 retractions.temp_items.insert(id, entry);
981 }
982 }
983 }
984
985 #[instrument(level = "debug")]
986 fn apply_item_update(
987 &mut self,
988 item: mz_catalog::durable::Item,
989 diff: StateDiff,
990 retractions: &mut InProgressRetractions,
991 local_expression_cache: &mut LocalExpressionCache,
992 ) -> Result<(), CatalogError> {
993 match diff {
994 StateDiff::Addition => {
995 let key = item.key();
996 let mz_catalog::durable::Item {
997 id,
998 oid,
999 global_id,
1000 schema_id,
1001 name,
1002 create_sql,
1003 owner_id,
1004 privileges,
1005 extra_versions,
1006 } = item;
1007 let schema = self.find_non_temp_schema(&schema_id);
1008 let name = QualifiedItemName {
1009 qualifiers: ItemQualifiers {
1010 database_spec: schema.database().clone(),
1011 schema_spec: schema.id().clone(),
1012 },
1013 item: name.clone(),
1014 };
1015 let entry = match retractions.items.remove(&key) {
1016 Some(mut retraction) => {
1017 assert_eq!(retraction.id, item.id);
1018 if retraction.create_sql() != create_sql {
1023 let item = self
1024 .deserialize_item(
1025 global_id,
1026 &create_sql,
1027 &extra_versions,
1028 local_expression_cache,
1029 Some(retraction.item),
1030 )
1031 .unwrap_or_else(|e| {
1032 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1033 });
1034 retraction.item = item;
1035 }
1036 retraction.id = id;
1037 retraction.oid = oid;
1038 retraction.name = name;
1039 retraction.owner_id = owner_id;
1040 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1041
1042 retraction
1043 }
1044 None => {
1045 let catalog_item = self
1046 .deserialize_item(
1047 global_id,
1048 &create_sql,
1049 &extra_versions,
1050 local_expression_cache,
1051 None,
1052 )
1053 .unwrap_or_else(|e| {
1054 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1055 });
1056 CatalogEntry {
1057 item: catalog_item,
1058 referenced_by: Vec::new(),
1059 used_by: Vec::new(),
1060 id,
1061 oid,
1062 name,
1063 owner_id,
1064 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1065 }
1066 }
1067 };
1068
1069 self.insert_entry(entry);
1070 }
1071 StateDiff::Retraction => {
1072 let entry = self.drop_item(item.id);
1073 let key = item.into_key_value().0;
1074 retractions.items.insert(key, entry);
1075 }
1076 }
1077 Ok(())
1078 }
1079
1080 #[instrument(level = "debug")]
1081 fn apply_comment_update(
1082 &mut self,
1083 comment: mz_catalog::durable::Comment,
1084 diff: StateDiff,
1085 _retractions: &mut InProgressRetractions,
1086 ) {
1087 match diff {
1088 StateDiff::Addition => {
1089 let prev = self.comments.update_comment(
1090 comment.object_id,
1091 comment.sub_component,
1092 Some(comment.comment),
1093 );
1094 assert_eq!(
1095 prev, None,
1096 "values must be explicitly retracted before inserting a new value"
1097 );
1098 }
1099 StateDiff::Retraction => {
1100 let prev =
1101 self.comments
1102 .update_comment(comment.object_id, comment.sub_component, None);
1103 assert_eq!(
1104 prev,
1105 Some(comment.comment),
1106 "retraction does not match existing value: ({:?}, {:?})",
1107 comment.object_id,
1108 comment.sub_component,
1109 );
1110 }
1111 }
1112 }
1113
1114 #[instrument(level = "debug")]
1115 fn apply_source_references_update(
1116 &mut self,
1117 source_references: mz_catalog::durable::SourceReferences,
1118 diff: StateDiff,
1119 _retractions: &mut InProgressRetractions,
1120 ) {
1121 match diff {
1122 StateDiff::Addition => {
1123 let prev = self
1124 .source_references
1125 .insert(source_references.source_id, source_references.into());
1126 assert!(
1127 prev.is_none(),
1128 "values must be explicitly retracted before inserting a new value: {prev:?}"
1129 );
1130 }
1131 StateDiff::Retraction => {
1132 let prev = self.source_references.remove(&source_references.source_id);
1133 assert!(
1134 prev.is_some(),
1135 "retraction for a non-existent existing value: {source_references:?}"
1136 );
1137 }
1138 }
1139 }
1140
1141 #[instrument(level = "debug")]
1142 fn apply_storage_collection_metadata_update(
1143 &mut self,
1144 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1145 diff: StateDiff,
1146 _retractions: &mut InProgressRetractions,
1147 ) {
1148 apply_inverted_lookup(
1149 &mut self.storage_metadata.collection_metadata,
1150 &storage_collection_metadata.id,
1151 storage_collection_metadata.shard,
1152 diff,
1153 );
1154 }
1155
1156 #[instrument(level = "debug")]
1157 fn apply_unfinalized_shard_update(
1158 &mut self,
1159 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1160 diff: StateDiff,
1161 _retractions: &mut InProgressRetractions,
1162 ) {
1163 match diff {
1164 StateDiff::Addition => {
1165 let newly_inserted = self
1166 .storage_metadata
1167 .unfinalized_shards
1168 .insert(unfinalized_shard.shard);
1169 assert!(
1170 newly_inserted,
1171 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1172 );
1173 }
1174 StateDiff::Retraction => {
1175 let removed = self
1176 .storage_metadata
1177 .unfinalized_shards
1178 .remove(&unfinalized_shard.shard);
1179 assert!(
1180 removed,
1181 "retraction does not match existing value: {unfinalized_shard:?}"
1182 );
1183 }
1184 }
1185 }
1186
1187 #[instrument]
1190 pub(crate) fn generate_builtin_table_updates(
1191 &self,
1192 updates: Vec<StateUpdate>,
1193 ) -> Vec<BuiltinTableUpdate> {
1194 let mut builtin_table_updates = Vec::new();
1195 for StateUpdate { kind, ts: _, diff } in updates {
1196 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1197 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1198 builtin_table_updates.extend(builtin_table_update);
1199 }
1200 builtin_table_updates
1201 }
1202
1203 #[instrument(level = "debug")]
1206 pub(crate) fn generate_builtin_table_update(
1207 &self,
1208 kind: StateUpdateKind,
1209 diff: StateDiff,
1210 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1211 let diff = diff.into();
1212 match kind {
1213 StateUpdateKind::Role(role) => {
1214 let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1215 for group_id in role.membership.map.keys() {
1216 builtin_table_updates
1217 .push(self.pack_role_members_update(*group_id, role.id, diff))
1218 }
1219 builtin_table_updates
1220 }
1221 StateUpdateKind::Database(database) => {
1222 vec![self.pack_database_update(&database.id, diff)]
1223 }
1224 StateUpdateKind::Schema(schema) => {
1225 let db_spec = schema.database_id.into();
1226 vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1227 }
1228 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1229 vec![self.pack_default_privileges_update(
1230 &default_privilege.object,
1231 &default_privilege.acl_item.grantee,
1232 &default_privilege.acl_item.acl_mode,
1233 diff,
1234 )]
1235 }
1236 StateUpdateKind::SystemPrivilege(system_privilege) => {
1237 vec![self.pack_system_privileges_update(system_privilege, diff)]
1238 }
1239 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1240 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1241 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1242 self.pack_item_update(introspection_source_index.item_id, diff)
1243 }
1244 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1245 cluster_replica.cluster_id,
1246 &cluster_replica.name,
1247 diff,
1248 ),
1249 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1250 if !system_object_mapping.unique_identifier.runtime_alterable() {
1254 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1255 } else {
1256 vec![]
1257 }
1258 }
1259 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1260 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1261 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1262 comment.object_id,
1263 comment.sub_component,
1264 &comment.comment,
1265 diff,
1266 )],
1267 StateUpdateKind::SourceReferences(source_references) => {
1268 self.pack_source_references_update(&source_references, diff)
1269 }
1270 StateUpdateKind::AuditLog(audit_log) => {
1271 vec![
1272 self.pack_audit_log_update(&audit_log.event, diff)
1273 .expect("could not pack audit log update"),
1274 ]
1275 }
1276 StateUpdateKind::NetworkPolicy(policy) => self
1277 .pack_network_policy_update(&policy.id, diff)
1278 .expect("could not pack audit log update"),
1279 StateUpdateKind::StorageCollectionMetadata(_)
1280 | StateUpdateKind::UnfinalizedShard(_)
1281 | StateUpdateKind::RoleAuth(_) => Vec::new(),
1282 }
1283 }
1284
1285 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1286 self.entry_by_id
1287 .get_mut(id)
1288 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1289 }
1290
1291 fn get_schema_mut(
1292 &mut self,
1293 database_spec: &ResolvedDatabaseSpecifier,
1294 schema_spec: &SchemaSpecifier,
1295 conn_id: &ConnectionId,
1296 ) -> &mut Schema {
1297 match (database_spec, schema_spec) {
1299 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1300 .temporary_schemas
1301 .get_mut(conn_id)
1302 .expect("catalog out of sync"),
1303 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1304 .ambient_schemas_by_id
1305 .get_mut(id)
1306 .expect("catalog out of sync"),
1307 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1308 .database_by_id
1309 .get_mut(database_id)
1310 .expect("catalog out of sync")
1311 .schemas_by_id
1312 .get_mut(schema_id)
1313 .expect("catalog out of sync"),
1314 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1315 unreachable!("temporary schemas are in the ambient database")
1316 }
1317 }
1318 }
1319
1320 #[instrument(name = "catalog::parse_views")]
1330 async fn parse_builtin_views(
1331 state: &mut CatalogState,
1332 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1333 retractions: &mut InProgressRetractions,
1334 local_expression_cache: &mut LocalExpressionCache,
1335 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1336 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1337 let (updates, additions): (Vec<_>, Vec<_>) =
1338 builtin_views
1339 .into_iter()
1340 .partition_map(|(view, item_id, gid)| {
1341 match retractions.system_object_mappings.remove(&item_id) {
1342 Some(entry) => Either::Left(entry),
1343 None => Either::Right((view, item_id, gid)),
1344 }
1345 });
1346
1347 for entry in updates {
1348 let item_id = entry.id();
1353 state.insert_entry(entry);
1354 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1355 }
1356
1357 let mut handles = Vec::new();
1358 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1359 BTreeMap::new();
1360 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1361 let mut awaiting_all = Vec::new();
1364 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1366 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1367
1368 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1370 .into_iter()
1371 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1372 .collect();
1373 let item_ids: Vec<_> = views.keys().copied().collect();
1374
1375 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1376 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1377 if handles.is_empty() && ready.is_empty() {
1378 ready.extend(awaiting_all.drain(..));
1380 }
1381
1382 if !ready.is_empty() {
1384 let spawn_state = Arc::new(state.clone());
1385 while let Some(id) = ready.pop_front() {
1386 let (view, global_id) = views.get(&id).expect("must exist");
1387 let global_id = *global_id;
1388 let create_sql = view.create_sql();
1389 let versions = BTreeMap::new();
1391
1392 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1393 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1394 let task_state = Arc::clone(&spawn_state);
1395 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1396 let handle = mz_ore::task::spawn(
1397 || "parse view",
1398 async move {
1399 let res = task_state.parse_item_inner(
1400 global_id,
1401 &create_sql,
1402 &versions,
1403 None,
1404 false,
1405 None,
1406 cached_expr,
1407 None,
1408 );
1409 (id, global_id, res)
1410 }
1411 .instrument(span),
1412 );
1413 handles.push(handle);
1414 }
1415 }
1416
1417 let (handle, _idx, remaining) = future::select_all(handles).await;
1419 handles = remaining;
1420 let (id, global_id, res) = handle.expect("must join");
1421 let mut insert_cached_expr = |cached_expr| {
1422 if let Some(cached_expr) = cached_expr {
1423 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1424 }
1425 };
1426 match res {
1427 Ok((item, uncached_expr)) => {
1428 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1429 local_expression_cache.insert_uncached_expression(
1430 global_id,
1431 uncached_expr,
1432 optimizer_features,
1433 );
1434 }
1435 let (view, _gid) = views.remove(&id).expect("must exist");
1437 let schema_id = state
1438 .ambient_schemas_by_name
1439 .get(view.schema)
1440 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1441 let qname = QualifiedItemName {
1442 qualifiers: ItemQualifiers {
1443 database_spec: ResolvedDatabaseSpecifier::Ambient,
1444 schema_spec: SchemaSpecifier::Id(*schema_id),
1445 },
1446 item: view.name.into(),
1447 };
1448 let mut acl_items = vec![rbac::owner_privilege(
1449 mz_sql::catalog::ObjectType::View,
1450 MZ_SYSTEM_ROLE_ID,
1451 )];
1452 acl_items.extend_from_slice(&view.access);
1453
1454 state.insert_item(
1455 id,
1456 view.oid,
1457 qname,
1458 item,
1459 MZ_SYSTEM_ROLE_ID,
1460 PrivilegeMap::from_mz_acl_items(acl_items),
1461 );
1462
1463 let mut resolved_dependent_items = Vec::new();
1465 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1466 resolved_dependent_items.extend(dependent_items);
1467 }
1468 let entry = state.get_entry(&id);
1469 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1470 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1471 resolved_dependent_items.extend(dependent_items);
1472 }
1473 ready.extend(resolved_dependent_items);
1474
1475 completed_ids.insert(id);
1476 completed_names.insert(full_name);
1477 }
1478 Err((
1480 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1481 cached_expr,
1482 )) => {
1483 insert_cached_expr(cached_expr);
1484 if completed_ids.contains(&missing_dep) {
1485 ready.push_back(id);
1486 } else {
1487 awaiting_id_dependencies
1488 .entry(missing_dep)
1489 .or_default()
1490 .push(id);
1491 }
1492 }
1493 Err((
1495 AdapterError::PlanError(plan::PlanError::Catalog(
1496 SqlCatalogError::UnknownItem(missing_dep),
1497 )),
1498 cached_expr,
1499 )) => {
1500 insert_cached_expr(cached_expr);
1501 match CatalogItemId::from_str(&missing_dep) {
1502 Ok(missing_dep) => {
1503 if completed_ids.contains(&missing_dep) {
1504 ready.push_back(id);
1505 } else {
1506 awaiting_id_dependencies
1507 .entry(missing_dep)
1508 .or_default()
1509 .push(id);
1510 }
1511 }
1512 Err(_) => {
1513 if completed_names.contains(&missing_dep) {
1514 ready.push_back(id);
1515 } else {
1516 awaiting_name_dependencies
1517 .entry(missing_dep)
1518 .or_default()
1519 .push(id);
1520 }
1521 }
1522 }
1523 }
1524 Err((
1525 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1526 cached_expr,
1527 )) => {
1528 insert_cached_expr(cached_expr);
1529 awaiting_all.push(id);
1530 }
1531 Err((e, _)) => {
1532 let (bad_view, _gid) = views.get(&id).expect("must exist");
1533 panic!(
1534 "internal error: failed to load bootstrap view:\n\
1535 {name}\n\
1536 error:\n\
1537 {e:?}\n\n\
1538 Make sure that the schema name is specified in the builtin view's create sql statement.
1539 ",
1540 name = bad_view.name,
1541 )
1542 }
1543 }
1544 }
1545
1546 assert!(awaiting_id_dependencies.is_empty());
1547 assert!(
1548 awaiting_name_dependencies.is_empty(),
1549 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1550 );
1551 assert!(awaiting_all.is_empty());
1552 assert!(views.is_empty());
1553
1554 builtin_table_updates.extend(
1556 item_ids
1557 .into_iter()
1558 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1559 );
1560
1561 builtin_table_updates
1562 }
1563
1564 fn insert_entry(&mut self, entry: CatalogEntry) {
1566 if !entry.id.is_system() {
1567 if let Some(cluster_id) = entry.item.cluster_id() {
1568 self.clusters_by_id
1569 .get_mut(&cluster_id)
1570 .expect("catalog out of sync")
1571 .bound_objects
1572 .insert(entry.id);
1573 };
1574 }
1575
1576 for u in entry.references().items() {
1577 match self.entry_by_id.get_mut(u) {
1578 Some(metadata) => metadata.referenced_by.push(entry.id()),
1579 None => panic!(
1580 "Catalog: missing dependent catalog item {} while installing {}",
1581 &u,
1582 self.resolve_full_name(entry.name(), entry.conn_id())
1583 ),
1584 }
1585 }
1586 for u in entry.uses() {
1587 if u == entry.id() {
1590 continue;
1591 }
1592 match self.entry_by_id.get_mut(&u) {
1593 Some(metadata) => metadata.used_by.push(entry.id()),
1594 None => panic!(
1595 "Catalog: missing dependent catalog item {} while installing {}",
1596 &u,
1597 self.resolve_full_name(entry.name(), entry.conn_id())
1598 ),
1599 }
1600 }
1601 for gid in entry.item.global_ids() {
1602 self.entry_by_global_id.insert(gid, entry.id());
1603 }
1604 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1605 let schema = self.get_schema_mut(
1606 &entry.name().qualifiers.database_spec,
1607 &entry.name().qualifiers.schema_spec,
1608 conn_id,
1609 );
1610
1611 let prev_id = match entry.item() {
1612 CatalogItem::Func(_) => schema
1613 .functions
1614 .insert(entry.name().item.clone(), entry.id()),
1615 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1616 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1617 };
1618
1619 assert!(
1620 prev_id.is_none(),
1621 "builtin name collision on {:?}",
1622 entry.name().item.clone()
1623 );
1624
1625 self.entry_by_id.insert(entry.id(), entry.clone());
1626 }
1627
1628 fn insert_item(
1630 &mut self,
1631 id: CatalogItemId,
1632 oid: u32,
1633 name: QualifiedItemName,
1634 item: CatalogItem,
1635 owner_id: RoleId,
1636 privileges: PrivilegeMap,
1637 ) {
1638 let entry = CatalogEntry {
1639 item,
1640 name,
1641 id,
1642 oid,
1643 used_by: Vec::new(),
1644 referenced_by: Vec::new(),
1645 owner_id,
1646 privileges,
1647 };
1648
1649 self.insert_entry(entry);
1650 }
1651
1652 #[mz_ore::instrument(level = "trace")]
1653 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1654 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1655 for u in metadata.references().items() {
1656 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1657 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1658 }
1659 }
1660 for u in metadata.uses() {
1661 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1662 dep_metadata.used_by.retain(|u| *u != metadata.id())
1663 }
1664 }
1665 for gid in metadata.global_ids() {
1666 self.entry_by_global_id.remove(&gid);
1667 }
1668
1669 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1670 let schema = self.get_schema_mut(
1671 &metadata.name().qualifiers.database_spec,
1672 &metadata.name().qualifiers.schema_spec,
1673 conn_id,
1674 );
1675 if metadata.item_type() == CatalogItemType::Type {
1676 schema
1677 .types
1678 .remove(&metadata.name().item)
1679 .expect("catalog out of sync");
1680 } else {
1681 assert_ne!(metadata.item_type(), CatalogItemType::Func);
1684
1685 schema
1686 .items
1687 .remove(&metadata.name().item)
1688 .expect("catalog out of sync");
1689 };
1690
1691 if !id.is_system() {
1692 if let Some(cluster_id) = metadata.item().cluster_id() {
1693 assert!(
1694 self.clusters_by_id
1695 .get_mut(&cluster_id)
1696 .expect("catalog out of sync")
1697 .bound_objects
1698 .remove(&id),
1699 "catalog out of sync"
1700 );
1701 }
1702 }
1703
1704 metadata
1705 }
1706
1707 fn insert_introspection_source_index(
1708 &mut self,
1709 cluster_id: ClusterId,
1710 log: &'static BuiltinLog,
1711 item_id: CatalogItemId,
1712 global_id: GlobalId,
1713 oid: u32,
1714 ) {
1715 let (index_name, index) =
1716 self.create_introspection_source_index(cluster_id, log, global_id);
1717 self.insert_item(
1718 item_id,
1719 oid,
1720 index_name,
1721 index,
1722 MZ_SYSTEM_ROLE_ID,
1723 PrivilegeMap::default(),
1724 );
1725 }
1726
1727 fn create_introspection_source_index(
1728 &self,
1729 cluster_id: ClusterId,
1730 log: &'static BuiltinLog,
1731 global_id: GlobalId,
1732 ) -> (QualifiedItemName, CatalogItem) {
1733 let source_name = FullItemName {
1734 database: RawDatabaseSpecifier::Ambient,
1735 schema: log.schema.into(),
1736 item: log.name.into(),
1737 };
1738 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1739 let mut index_name = QualifiedItemName {
1740 qualifiers: ItemQualifiers {
1741 database_spec: ResolvedDatabaseSpecifier::Ambient,
1742 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1743 },
1744 item: index_name.clone(),
1745 };
1746 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1747 let index_item_name = index_name.item.clone();
1748 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1749 let index = CatalogItem::Index(Index {
1750 global_id,
1751 on: log_global_id,
1752 keys: log
1753 .variant
1754 .index_by()
1755 .into_iter()
1756 .map(MirScalarExpr::Column)
1757 .collect(),
1758 create_sql: index_sql(
1759 index_item_name,
1760 cluster_id,
1761 source_name,
1762 &log.variant.desc(),
1763 &log.variant.index_by(),
1764 ),
1765 conn_id: None,
1766 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1767 cluster_id,
1768 is_retained_metrics_object: false,
1769 custom_logical_compaction_window: None,
1770 });
1771 (index_name, index)
1772 }
1773
1774 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1779 Ok(self.system_configuration.set(name, value)?)
1780 }
1781
1782 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1787 Ok(self.system_configuration.reset(name)?)
1788 }
1789}
1790
1791fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1793 let mut sorted_updates = Vec::with_capacity(updates.len());
1794
1795 updates.sort_by_key(|update| update.ts);
1796 for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
1797 let sorted_ts_updates = sort_updates_inner(updates.collect());
1798 sorted_updates.extend(sorted_ts_updates);
1799 }
1800
1801 sorted_updates
1802}
1803
1804fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1806 fn push_update<T>(
1807 update: T,
1808 diff: StateDiff,
1809 retractions: &mut Vec<T>,
1810 additions: &mut Vec<T>,
1811 ) {
1812 match diff {
1813 StateDiff::Retraction => retractions.push(update),
1814 StateDiff::Addition => additions.push(update),
1815 }
1816 }
1817
1818 soft_assert_no_log!(
1819 updates.iter().map(|update| update.ts).all_equal(),
1820 "all timestamps should be equal: {updates:?}"
1821 );
1822
1823 let mut pre_cluster_retractions = Vec::new();
1825 let mut pre_cluster_additions = Vec::new();
1826 let mut cluster_retractions = Vec::new();
1827 let mut cluster_additions = Vec::new();
1828 let mut builtin_item_updates = Vec::new();
1829 let mut item_retractions = Vec::new();
1830 let mut item_additions = Vec::new();
1831 let mut temp_item_retractions = Vec::new();
1832 let mut temp_item_additions = Vec::new();
1833 let mut post_item_retractions = Vec::new();
1834 let mut post_item_additions = Vec::new();
1835 for update in updates {
1836 let diff = update.diff.clone();
1837 match update.kind {
1838 StateUpdateKind::Role(_)
1839 | StateUpdateKind::RoleAuth(_)
1840 | StateUpdateKind::Database(_)
1841 | StateUpdateKind::Schema(_)
1842 | StateUpdateKind::DefaultPrivilege(_)
1843 | StateUpdateKind::SystemPrivilege(_)
1844 | StateUpdateKind::SystemConfiguration(_)
1845 | StateUpdateKind::NetworkPolicy(_) => push_update(
1846 update,
1847 diff,
1848 &mut pre_cluster_retractions,
1849 &mut pre_cluster_additions,
1850 ),
1851 StateUpdateKind::Cluster(_)
1852 | StateUpdateKind::IntrospectionSourceIndex(_)
1853 | StateUpdateKind::ClusterReplica(_) => push_update(
1854 update,
1855 diff,
1856 &mut cluster_retractions,
1857 &mut cluster_additions,
1858 ),
1859 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1860 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1861 }
1862 StateUpdateKind::TemporaryItem(item) => push_update(
1863 (item, update.ts, update.diff),
1864 diff,
1865 &mut temp_item_retractions,
1866 &mut temp_item_additions,
1867 ),
1868 StateUpdateKind::Item(item) => push_update(
1869 (item, update.ts, update.diff),
1870 diff,
1871 &mut item_retractions,
1872 &mut item_additions,
1873 ),
1874 StateUpdateKind::Comment(_)
1875 | StateUpdateKind::SourceReferences(_)
1876 | StateUpdateKind::AuditLog(_)
1877 | StateUpdateKind::StorageCollectionMetadata(_)
1878 | StateUpdateKind::UnfinalizedShard(_) => push_update(
1879 update,
1880 diff,
1881 &mut post_item_retractions,
1882 &mut post_item_additions,
1883 ),
1884 }
1885 }
1886
1887 let builtin_item_updates = builtin_item_updates
1889 .into_iter()
1890 .map(|(system_object_mapping, ts, diff)| {
1891 let idx = BUILTIN_LOOKUP
1892 .get(&system_object_mapping.description)
1893 .expect("missing builtin")
1894 .0;
1895 (idx, system_object_mapping, ts, diff)
1896 })
1897 .sorted_by_key(|(idx, _, _, _)| *idx)
1898 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
1899
1900 let mut other_builtin_retractions = Vec::new();
1902 let mut other_builtin_additions = Vec::new();
1903 let mut builtin_index_retractions = Vec::new();
1904 let mut builtin_index_additions = Vec::new();
1905 for (builtin_item_update, ts, diff) in builtin_item_updates {
1906 match &builtin_item_update.description.object_type {
1907 CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
1908 StateUpdate {
1909 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1910 ts,
1911 diff,
1912 },
1913 diff,
1914 &mut builtin_index_retractions,
1915 &mut builtin_index_additions,
1916 ),
1917 CatalogItemType::Table
1918 | CatalogItemType::Source
1919 | CatalogItemType::Sink
1920 | CatalogItemType::View
1921 | CatalogItemType::MaterializedView
1922 | CatalogItemType::Type
1923 | CatalogItemType::Func
1924 | CatalogItemType::Secret
1925 | CatalogItemType::Connection => push_update(
1926 StateUpdate {
1927 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1928 ts,
1929 diff,
1930 },
1931 diff,
1932 &mut other_builtin_retractions,
1933 &mut other_builtin_additions,
1934 ),
1935 }
1936 }
1937
1938 fn sort_item_updates(
1954 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
1955 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
1956 let mut types = Vec::new();
1959 let mut funcs = Vec::new();
1962 let mut secrets = Vec::new();
1963 let mut connections = Vec::new();
1964 let mut sources = Vec::new();
1965 let mut tables = Vec::new();
1966 let mut derived_items = Vec::new();
1967 let mut sinks = Vec::new();
1968 let mut continual_tasks = Vec::new();
1969
1970 for update in item_updates {
1971 match update.0.item_type() {
1972 CatalogItemType::Type => types.push(update),
1973 CatalogItemType::Func => funcs.push(update),
1974 CatalogItemType::Secret => secrets.push(update),
1975 CatalogItemType::Connection => connections.push(update),
1976 CatalogItemType::Source => sources.push(update),
1977 CatalogItemType::Table => tables.push(update),
1978 CatalogItemType::View
1979 | CatalogItemType::MaterializedView
1980 | CatalogItemType::Index => derived_items.push(update),
1981 CatalogItemType::Sink => sinks.push(update),
1982 CatalogItemType::ContinualTask => continual_tasks.push(update),
1983 }
1984 }
1985
1986 for group in [
1988 &mut types,
1989 &mut funcs,
1990 &mut secrets,
1991 &mut connections,
1992 &mut sources,
1993 &mut tables,
1994 &mut derived_items,
1995 &mut sinks,
1996 &mut continual_tasks,
1997 ] {
1998 group.sort_by_key(|(item, _, _)| item.id);
1999 }
2000
2001 iter::empty()
2002 .chain(types)
2003 .chain(funcs)
2004 .chain(secrets)
2005 .chain(connections)
2006 .chain(sources)
2007 .chain(tables)
2008 .chain(derived_items)
2009 .chain(sinks)
2010 .chain(continual_tasks)
2011 .collect()
2012 }
2013
2014 let item_retractions = sort_item_updates(item_retractions);
2015 let item_additions = sort_item_updates(item_additions);
2016
2017 fn sort_temp_item_updates(
2021 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2022 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2023 let mut types = Vec::new();
2026 let mut funcs = Vec::new();
2028 let mut secrets = Vec::new();
2029 let mut connections = Vec::new();
2030 let mut sources = Vec::new();
2031 let mut tables = Vec::new();
2032 let mut derived_items = Vec::new();
2033 let mut sinks = Vec::new();
2034 let mut continual_tasks = Vec::new();
2035
2036 for update in temp_item_updates {
2037 match update.0.item.typ() {
2038 CatalogItemType::Type => types.push(update),
2039 CatalogItemType::Func => funcs.push(update),
2040 CatalogItemType::Secret => secrets.push(update),
2041 CatalogItemType::Connection => connections.push(update),
2042 CatalogItemType::Source => sources.push(update),
2043 CatalogItemType::Table => tables.push(update),
2044 CatalogItemType::View
2045 | CatalogItemType::MaterializedView
2046 | CatalogItemType::Index => derived_items.push(update),
2047 CatalogItemType::Sink => sinks.push(update),
2048 CatalogItemType::ContinualTask => continual_tasks.push(update),
2049 }
2050 }
2051
2052 for group in [
2054 &mut types,
2055 &mut funcs,
2056 &mut secrets,
2057 &mut connections,
2058 &mut sources,
2059 &mut tables,
2060 &mut derived_items,
2061 &mut sinks,
2062 &mut continual_tasks,
2063 ] {
2064 group.sort_by_key(|(item, _, _)| item.id);
2065 }
2066
2067 iter::empty()
2068 .chain(types)
2069 .chain(funcs)
2070 .chain(secrets)
2071 .chain(connections)
2072 .chain(sources)
2073 .chain(tables)
2074 .chain(derived_items)
2075 .chain(sinks)
2076 .chain(continual_tasks)
2077 .collect()
2078 }
2079 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2080 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2081
2082 fn merge_item_updates(
2084 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2085 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2086 ) -> Vec<StateUpdate> {
2087 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2088
2089 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2090 (item_updates.front(), temp_item_updates.front())
2091 {
2092 if item.id < temp_item.id {
2093 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2094 state_updates.push(StateUpdate {
2095 kind: StateUpdateKind::Item(item),
2096 ts,
2097 diff,
2098 });
2099 } else if item.id > temp_item.id {
2100 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2101 state_updates.push(StateUpdate {
2102 kind: StateUpdateKind::TemporaryItem(temp_item),
2103 ts,
2104 diff,
2105 });
2106 } else {
2107 unreachable!(
2108 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2109 );
2110 }
2111 }
2112
2113 while let Some((item, ts, diff)) = item_updates.pop_front() {
2114 state_updates.push(StateUpdate {
2115 kind: StateUpdateKind::Item(item),
2116 ts,
2117 diff,
2118 });
2119 }
2120
2121 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2122 state_updates.push(StateUpdate {
2123 kind: StateUpdateKind::TemporaryItem(temp_item),
2124 ts,
2125 diff,
2126 });
2127 }
2128
2129 state_updates
2130 }
2131 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2132 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2133
2134 iter::empty()
2136 .chain(post_item_retractions.into_iter().rev())
2138 .chain(item_retractions.into_iter().rev())
2139 .chain(builtin_index_retractions.into_iter().rev())
2140 .chain(cluster_retractions.into_iter().rev())
2141 .chain(other_builtin_retractions.into_iter().rev())
2142 .chain(pre_cluster_retractions.into_iter().rev())
2143 .chain(pre_cluster_additions)
2144 .chain(other_builtin_additions)
2145 .chain(cluster_additions)
2146 .chain(builtin_index_additions)
2147 .chain(item_additions)
2148 .chain(post_item_additions)
2149 .collect()
2150}
2151
2152enum BootstrapApplyState {
2156 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2158 Items(Vec<StateUpdate>),
2160 Updates(Vec<StateUpdate>),
2162}
2163
2164impl BootstrapApplyState {
2165 fn new(update: StateUpdate) -> BootstrapApplyState {
2166 match update {
2167 StateUpdate {
2168 kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2169 diff: StateDiff::Addition,
2170 ..
2171 } if matches!(
2172 system_object_mapping.description.object_type,
2173 CatalogItemType::View
2174 ) =>
2175 {
2176 let view_addition = lookup_builtin_view_addition(system_object_mapping);
2177 BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2178 }
2179 StateUpdate {
2180 kind: StateUpdateKind::IntrospectionSourceIndex(_),
2181 ..
2182 }
2183 | StateUpdate {
2184 kind: StateUpdateKind::SystemObjectMapping(_),
2185 ..
2186 }
2187 | StateUpdate {
2188 kind: StateUpdateKind::Item(_),
2189 ..
2190 } => BootstrapApplyState::Items(vec![update]),
2191 update => BootstrapApplyState::Updates(vec![update]),
2192 }
2193 }
2194
2195 async fn apply(
2201 self,
2202 state: &mut CatalogState,
2203 retractions: &mut InProgressRetractions,
2204 local_expression_cache: &mut LocalExpressionCache,
2205 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2206 match self {
2207 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2208 let restore = state.system_configuration.clone();
2209 state.system_configuration.enable_for_item_parsing();
2210 let builtin_table_updates = CatalogState::parse_builtin_views(
2211 state,
2212 builtin_view_additions,
2213 retractions,
2214 local_expression_cache,
2215 )
2216 .await;
2217 state.system_configuration = restore;
2218 builtin_table_updates
2219 }
2220 BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2221 state
2222 .apply_updates_inner(updates, retractions, local_expression_cache)
2223 .expect("corrupt catalog")
2224 }),
2225 BootstrapApplyState::Updates(updates) => state
2226 .apply_updates_inner(updates, retractions, local_expression_cache)
2227 .expect("corrupt catalog"),
2228 }
2229 }
2230
2231 async fn step(
2232 self,
2233 next: BootstrapApplyState,
2234 state: &mut CatalogState,
2235 retractions: &mut InProgressRetractions,
2236 local_expression_cache: &mut LocalExpressionCache,
2237 ) -> (
2238 BootstrapApplyState,
2239 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2240 ) {
2241 match (self, next) {
2242 (
2243 BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2244 BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2245 ) => {
2246 builtin_view_additions.extend(next_builtin_view_additions);
2248 (
2249 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2250 Vec::new(),
2251 )
2252 }
2253 (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2254 updates.extend(next_updates);
2256 (BootstrapApplyState::Items(updates), Vec::new())
2257 }
2258 (
2259 BootstrapApplyState::Updates(mut updates),
2260 BootstrapApplyState::Updates(next_updates),
2261 ) => {
2262 updates.extend(next_updates);
2264 (BootstrapApplyState::Updates(updates), Vec::new())
2265 }
2266 (apply_state, next_apply_state) => {
2267 let builtin_table_update = apply_state
2269 .apply(state, retractions, local_expression_cache)
2270 .await;
2271 (next_apply_state, builtin_table_update)
2272 }
2273 }
2274 }
2275}
2276
2277fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2282where
2283 K: Ord + Clone + Debug,
2284 V: PartialEq + Debug,
2285{
2286 match diff {
2287 StateDiff::Retraction => {
2288 let prev = map.remove(key);
2289 assert_eq!(
2290 prev,
2291 Some(value),
2292 "retraction does not match existing value: {key:?}"
2293 );
2294 }
2295 StateDiff::Addition => {
2296 let prev = map.insert(key.clone(), value);
2297 assert_eq!(
2298 prev, None,
2299 "values must be explicitly retracted before inserting a new value: {key:?}"
2300 );
2301 }
2302 }
2303}
2304
2305fn apply_with_update<K, V, D>(
2308 map: &mut BTreeMap<K, V>,
2309 durable: D,
2310 key_fn: impl FnOnce(&D) -> K,
2311 diff: StateDiff,
2312 retractions: &mut BTreeMap<D::Key, V>,
2313) where
2314 K: Ord,
2315 V: UpdateFrom<D> + PartialEq + Debug,
2316 D: DurableType,
2317 D::Key: Ord,
2318{
2319 match diff {
2320 StateDiff::Retraction => {
2321 let mem_key = key_fn(&durable);
2322 let value = map
2323 .remove(&mem_key)
2324 .expect("retraction does not match existing value: {key:?}");
2325 let durable_key = durable.into_key_value().0;
2326 retractions.insert(durable_key, value);
2327 }
2328 StateDiff::Addition => {
2329 let mem_key = key_fn(&durable);
2330 let durable_key = durable.key();
2331 let value = match retractions.remove(&durable_key) {
2332 Some(mut retraction) => {
2333 retraction.update_from(durable);
2334 retraction
2335 }
2336 None => durable.into(),
2337 };
2338 let prev = map.insert(mem_key, value);
2339 assert_eq!(
2340 prev, None,
2341 "values must be explicitly retracted before inserting a new value"
2342 );
2343 }
2344 }
2345}
2346
2347fn lookup_builtin_view_addition(
2349 mapping: SystemObjectMapping,
2350) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2351 let (_, builtin) = BUILTIN_LOOKUP
2352 .get(&mapping.description)
2353 .expect("missing builtin view");
2354 let Builtin::View(view) = builtin else {
2355 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2356 };
2357
2358 (
2359 view,
2360 mapping.unique_identifier.catalog_id,
2361 mapping.unique_identifier.global_id,
2362 )
2363}