1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::future;
20use itertools::{Either, Itertools};
21use mz_adapter_types::connection::ConnectionId;
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
25};
26use mz_catalog::durable::objects::{
27 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
28 SchemaKey,
29};
30use mz_catalog::durable::{CatalogError, SystemObjectMapping};
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
34 NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
35 TableDataSource, TemporaryItem, Type, UpdateFrom,
36};
37use mz_compute_types::config::ComputeReplicaConfig;
38use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
39use mz_controller_types::ClusterId;
40use mz_expr::MirScalarExpr;
41use mz_ore::collections::CollectionExt;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_ore::{assert_none, instrument, soft_assert_no_log};
44use mz_pgrepr::oid::INVALID_OID;
45use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
48use mz_sql::catalog::CatalogError as SqlCatalogError;
49use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
50use mz_sql::names::{
51 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
52 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
53};
54use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
55use mz_sql::session::vars::{VarError, VarInput};
56use mz_sql::{plan, rbac};
57use mz_sql_parser::ast::Expr;
58use mz_storage_types::sources::Timeline;
59use tracing::{Instrument, info_span, warn};
60
61use crate::AdapterError;
62use crate::catalog::state::LocalExpressionCache;
63use crate::catalog::{BuiltinTableUpdate, CatalogState};
64use crate::util::index_sql;
65
66#[derive(Debug, Clone, Default)]
78struct InProgressRetractions {
79 roles: BTreeMap<RoleKey, Role>,
80 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
81 databases: BTreeMap<DatabaseKey, Database>,
82 schemas: BTreeMap<SchemaKey, Schema>,
83 clusters: BTreeMap<ClusterKey, Cluster>,
84 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
85 items: BTreeMap<ItemKey, CatalogEntry>,
86 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
87 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
88 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
89}
90
91impl CatalogState {
92 #[must_use]
99 #[instrument]
100 pub(crate) async fn apply_updates_for_bootstrap(
101 &mut self,
102 updates: Vec<StateUpdate>,
103 local_expression_cache: &mut LocalExpressionCache,
104 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
105 let mut builtin_table_updates = Vec::with_capacity(updates.len());
106 let updates = sort_updates(updates);
107
108 let mut groups: Vec<Vec<_>> = Vec::new();
109 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
110 groups.push(updates.collect());
111 }
112 for updates in groups {
113 let mut apply_state = BootstrapApplyState::Updates(Vec::new());
114 let mut retractions = InProgressRetractions::default();
115
116 for update in updates {
117 let next_apply_state = BootstrapApplyState::new(update);
118 let (next_apply_state, builtin_table_update) = apply_state
119 .step(
120 next_apply_state,
121 self,
122 &mut retractions,
123 local_expression_cache,
124 )
125 .await;
126 apply_state = next_apply_state;
127 builtin_table_updates.extend(builtin_table_update);
128 }
129
130 let builtin_table_update = apply_state
132 .apply(self, &mut retractions, local_expression_cache)
133 .await;
134 builtin_table_updates.extend(builtin_table_update);
135 }
136 builtin_table_updates
137 }
138
139 #[instrument]
143 pub(crate) fn apply_updates(
144 &mut self,
145 updates: Vec<StateUpdate>,
146 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
147 let mut builtin_table_updates = Vec::with_capacity(updates.len());
148 let updates = sort_updates(updates);
149
150 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
151 let mut retractions = InProgressRetractions::default();
152 let builtin_table_update = self.apply_updates_inner(
153 updates.collect(),
154 &mut retractions,
155 &mut LocalExpressionCache::Closed,
156 )?;
157 builtin_table_updates.extend(builtin_table_update);
158 }
159
160 Ok(builtin_table_updates)
161 }
162
163 #[instrument(level = "debug")]
164 fn apply_updates_inner(
165 &mut self,
166 updates: Vec<StateUpdate>,
167 retractions: &mut InProgressRetractions,
168 local_expression_cache: &mut LocalExpressionCache,
169 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
170 soft_assert_no_log!(
171 updates.iter().map(|update| update.ts).all_equal(),
172 "all timestamps should be equal: {updates:?}"
173 );
174
175 let mut update_system_config = false;
176
177 let mut builtin_table_updates = Vec::with_capacity(updates.len());
178 for StateUpdate { kind, ts: _, diff } in updates {
179 if matches!(kind, StateUpdateKind::SystemConfiguration(_)) {
180 update_system_config = true;
181 }
182
183 match diff {
184 StateDiff::Retraction => {
185 builtin_table_updates
188 .extend(self.generate_builtin_table_update(kind.clone(), diff));
189 self.apply_update(kind, diff, retractions, local_expression_cache)?;
190 }
191 StateDiff::Addition => {
192 self.apply_update(kind.clone(), diff, retractions, local_expression_cache)?;
193 builtin_table_updates
196 .extend(self.generate_builtin_table_update(kind.clone(), diff));
197 }
198 }
199 }
200
201 if update_system_config {
202 self.system_configuration.dyncfg_updates();
203 }
204
205 Ok(builtin_table_updates)
206 }
207
208 #[instrument(level = "debug")]
209 fn apply_update(
210 &mut self,
211 kind: StateUpdateKind,
212 diff: StateDiff,
213 retractions: &mut InProgressRetractions,
214 local_expression_cache: &mut LocalExpressionCache,
215 ) -> Result<(), CatalogError> {
216 match kind {
217 StateUpdateKind::Role(role) => {
218 self.apply_role_update(role, diff, retractions);
219 }
220 StateUpdateKind::RoleAuth(role_auth) => {
221 self.apply_role_auth_update(role_auth, diff, retractions);
222 }
223 StateUpdateKind::Database(database) => {
224 self.apply_database_update(database, diff, retractions);
225 }
226 StateUpdateKind::Schema(schema) => {
227 self.apply_schema_update(schema, diff, retractions);
228 }
229 StateUpdateKind::DefaultPrivilege(default_privilege) => {
230 self.apply_default_privilege_update(default_privilege, diff, retractions);
231 }
232 StateUpdateKind::SystemPrivilege(system_privilege) => {
233 self.apply_system_privilege_update(system_privilege, diff, retractions);
234 }
235 StateUpdateKind::SystemConfiguration(system_configuration) => {
236 self.apply_system_configuration_update(system_configuration, diff, retractions);
237 }
238 StateUpdateKind::Cluster(cluster) => {
239 self.apply_cluster_update(cluster, diff, retractions);
240 }
241 StateUpdateKind::NetworkPolicy(network_policy) => {
242 self.apply_network_policy_update(network_policy, diff, retractions);
243 }
244 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
245 self.apply_introspection_source_index_update(
246 introspection_source_index,
247 diff,
248 retractions,
249 );
250 }
251 StateUpdateKind::ClusterReplica(cluster_replica) => {
252 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
253 }
254 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
255 self.apply_system_object_mapping_update(
256 system_object_mapping,
257 diff,
258 retractions,
259 local_expression_cache,
260 );
261 }
262 StateUpdateKind::TemporaryItem(item) => {
263 self.apply_temporary_item_update(item, diff, retractions);
264 }
265 StateUpdateKind::Item(item) => {
266 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
267 }
268 StateUpdateKind::Comment(comment) => {
269 self.apply_comment_update(comment, diff, retractions);
270 }
271 StateUpdateKind::SourceReferences(source_reference) => {
272 self.apply_source_references_update(source_reference, diff, retractions);
273 }
274 StateUpdateKind::AuditLog(_audit_log) => {
275 }
277 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
278 self.apply_storage_collection_metadata_update(
279 storage_collection_metadata,
280 diff,
281 retractions,
282 );
283 }
284 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
285 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
286 }
287 }
288
289 Ok(())
290 }
291
292 #[instrument(level = "debug")]
293 fn apply_role_auth_update(
294 &mut self,
295 role_auth: mz_catalog::durable::RoleAuth,
296 diff: StateDiff,
297 retractions: &mut InProgressRetractions,
298 ) {
299 apply_with_update(
300 &mut self.role_auth_by_id,
301 role_auth,
302 |role_auth| role_auth.role_id,
303 diff,
304 &mut retractions.role_auths,
305 );
306 }
307
308 #[instrument(level = "debug")]
309 fn apply_role_update(
310 &mut self,
311 role: mz_catalog::durable::Role,
312 diff: StateDiff,
313 retractions: &mut InProgressRetractions,
314 ) {
315 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
316 apply_with_update(
317 &mut self.roles_by_id,
318 role,
319 |role| role.id,
320 diff,
321 &mut retractions.roles,
322 );
323 }
324
325 #[instrument(level = "debug")]
326 fn apply_database_update(
327 &mut self,
328 database: mz_catalog::durable::Database,
329 diff: StateDiff,
330 retractions: &mut InProgressRetractions,
331 ) {
332 apply_inverted_lookup(
333 &mut self.database_by_name,
334 &database.name,
335 database.id,
336 diff,
337 );
338 apply_with_update(
339 &mut self.database_by_id,
340 database,
341 |database| database.id,
342 diff,
343 &mut retractions.databases,
344 );
345 }
346
347 #[instrument(level = "debug")]
348 fn apply_schema_update(
349 &mut self,
350 schema: mz_catalog::durable::Schema,
351 diff: StateDiff,
352 retractions: &mut InProgressRetractions,
353 ) {
354 let (schemas_by_id, schemas_by_name) = match &schema.database_id {
355 Some(database_id) => {
356 let db = self
357 .database_by_id
358 .get_mut(database_id)
359 .expect("catalog out of sync");
360 (&mut db.schemas_by_id, &mut db.schemas_by_name)
361 }
362 None => (
363 &mut self.ambient_schemas_by_id,
364 &mut self.ambient_schemas_by_name,
365 ),
366 };
367 apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
368 apply_with_update(
369 schemas_by_id,
370 schema,
371 |schema| schema.id,
372 diff,
373 &mut retractions.schemas,
374 );
375 }
376
377 #[instrument(level = "debug")]
378 fn apply_default_privilege_update(
379 &mut self,
380 default_privilege: mz_catalog::durable::DefaultPrivilege,
381 diff: StateDiff,
382 _retractions: &mut InProgressRetractions,
383 ) {
384 match diff {
385 StateDiff::Addition => self
386 .default_privileges
387 .grant(default_privilege.object, default_privilege.acl_item),
388 StateDiff::Retraction => self
389 .default_privileges
390 .revoke(&default_privilege.object, &default_privilege.acl_item),
391 }
392 }
393
394 #[instrument(level = "debug")]
395 fn apply_system_privilege_update(
396 &mut self,
397 system_privilege: MzAclItem,
398 diff: StateDiff,
399 _retractions: &mut InProgressRetractions,
400 ) {
401 match diff {
402 StateDiff::Addition => self.system_privileges.grant(system_privilege),
403 StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
404 }
405 }
406
407 #[instrument(level = "debug")]
408 fn apply_system_configuration_update(
409 &mut self,
410 system_configuration: mz_catalog::durable::SystemConfiguration,
411 diff: StateDiff,
412 _retractions: &mut InProgressRetractions,
413 ) {
414 let res = match diff {
415 StateDiff::Addition => self.insert_system_configuration(
416 &system_configuration.name,
417 VarInput::Flat(&system_configuration.value),
418 ),
419 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
420 };
421 match res {
422 Ok(_) => (),
423 Err(Error {
427 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
428 }) => {
429 warn!(%name, "unknown system parameter from catalog storage");
430 }
431 Err(e) => panic!("unable to update system variable: {e:?}"),
432 }
433 }
434
435 #[instrument(level = "debug")]
436 fn apply_cluster_update(
437 &mut self,
438 cluster: mz_catalog::durable::Cluster,
439 diff: StateDiff,
440 retractions: &mut InProgressRetractions,
441 ) {
442 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
443 apply_with_update(
444 &mut self.clusters_by_id,
445 cluster,
446 |cluster| cluster.id,
447 diff,
448 &mut retractions.clusters,
449 );
450 }
451
452 #[instrument(level = "debug")]
453 fn apply_network_policy_update(
454 &mut self,
455 policy: mz_catalog::durable::NetworkPolicy,
456 diff: StateDiff,
457 retractions: &mut InProgressRetractions,
458 ) {
459 apply_inverted_lookup(
460 &mut self.network_policies_by_name,
461 &policy.name,
462 policy.id,
463 diff,
464 );
465 apply_with_update(
466 &mut self.network_policies_by_id,
467 policy,
468 |policy| policy.id,
469 diff,
470 &mut retractions.network_policies,
471 );
472 }
473
474 #[instrument(level = "debug")]
475 fn apply_introspection_source_index_update(
476 &mut self,
477 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
478 diff: StateDiff,
479 retractions: &mut InProgressRetractions,
480 ) {
481 let cluster = self
482 .clusters_by_id
483 .get_mut(&introspection_source_index.cluster_id)
484 .expect("catalog out of sync");
485 let log = BUILTIN_LOG_LOOKUP
486 .get(introspection_source_index.name.as_str())
487 .expect("missing log");
488 apply_inverted_lookup(
489 &mut cluster.log_indexes,
490 &log.variant,
491 introspection_source_index.index_id,
492 diff,
493 );
494
495 match diff {
496 StateDiff::Addition => {
497 if let Some(mut entry) = retractions
498 .introspection_source_indexes
499 .remove(&introspection_source_index.item_id)
500 {
501 let (index_name, index) = self.create_introspection_source_index(
504 introspection_source_index.cluster_id,
505 log,
506 introspection_source_index.index_id,
507 );
508 assert_eq!(entry.id, introspection_source_index.item_id);
509 assert_eq!(entry.oid, introspection_source_index.oid);
510 assert_eq!(entry.name, index_name);
511 entry.item = index;
512 self.insert_entry(entry);
513 } else {
514 self.insert_introspection_source_index(
515 introspection_source_index.cluster_id,
516 log,
517 introspection_source_index.item_id,
518 introspection_source_index.index_id,
519 introspection_source_index.oid,
520 );
521 }
522 }
523 StateDiff::Retraction => {
524 let entry = self.drop_item(introspection_source_index.item_id);
525 retractions
526 .introspection_source_indexes
527 .insert(entry.id, entry);
528 }
529 }
530 }
531
532 #[instrument(level = "debug")]
533 fn apply_cluster_replica_update(
534 &mut self,
535 cluster_replica: mz_catalog::durable::ClusterReplica,
536 diff: StateDiff,
537 _retractions: &mut InProgressRetractions,
538 ) {
539 let cluster = self
540 .clusters_by_id
541 .get(&cluster_replica.cluster_id)
542 .expect("catalog out of sync");
543 let azs = cluster.availability_zones();
544 let location = self
545 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
546 .expect("catalog in unexpected state");
547 let cluster = self
548 .clusters_by_id
549 .get_mut(&cluster_replica.cluster_id)
550 .expect("catalog out of sync");
551 apply_inverted_lookup(
552 &mut cluster.replica_id_by_name_,
553 &cluster_replica.name,
554 cluster_replica.replica_id,
555 diff,
556 );
557 match diff {
558 StateDiff::Retraction => {
559 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
560 assert!(
561 prev.is_some(),
562 "retraction does not match existing value: {:?}",
563 cluster_replica.replica_id
564 );
565 }
566 StateDiff::Addition => {
567 let logging = ReplicaLogging {
568 log_logging: cluster_replica.config.logging.log_logging,
569 interval: cluster_replica.config.logging.interval,
570 };
571 let config = ReplicaConfig {
572 location,
573 compute: ComputeReplicaConfig { logging },
574 };
575 let mem_cluster_replica = ClusterReplica {
576 name: cluster_replica.name.clone(),
577 cluster_id: cluster_replica.cluster_id,
578 replica_id: cluster_replica.replica_id,
579 config,
580 owner_id: cluster_replica.owner_id,
581 };
582 let prev = cluster
583 .replicas_by_id_
584 .insert(cluster_replica.replica_id, mem_cluster_replica);
585 assert_eq!(
586 prev, None,
587 "values must be explicitly retracted before inserting a new value: {:?}",
588 cluster_replica.replica_id
589 );
590 }
591 }
592 }
593
594 #[instrument(level = "debug")]
595 fn apply_system_object_mapping_update(
596 &mut self,
597 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
598 diff: StateDiff,
599 retractions: &mut InProgressRetractions,
600 local_expression_cache: &mut LocalExpressionCache,
601 ) {
602 let item_id = system_object_mapping.unique_identifier.catalog_id;
603 let global_id = system_object_mapping.unique_identifier.global_id;
604
605 if system_object_mapping.unique_identifier.runtime_alterable() {
606 return;
610 }
611
612 if let StateDiff::Retraction = diff {
613 let entry = self.drop_item(item_id);
614 retractions.system_object_mappings.insert(item_id, entry);
615 return;
616 }
617
618 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
619 self.insert_entry(entry);
624 return;
625 }
626
627 let builtin = BUILTIN_LOOKUP
628 .get(&system_object_mapping.description)
629 .expect("missing builtin")
630 .1;
631 let schema_name = builtin.schema();
632 let schema_id = self
633 .ambient_schemas_by_name
634 .get(schema_name)
635 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
636 let name = QualifiedItemName {
637 qualifiers: ItemQualifiers {
638 database_spec: ResolvedDatabaseSpecifier::Ambient,
639 schema_spec: SchemaSpecifier::Id(*schema_id),
640 },
641 item: builtin.name().into(),
642 };
643 match builtin {
644 Builtin::Log(log) => {
645 let mut acl_items = vec![rbac::owner_privilege(
646 mz_sql::catalog::ObjectType::Source,
647 MZ_SYSTEM_ROLE_ID,
648 )];
649 acl_items.extend_from_slice(&log.access);
650 self.insert_item(
651 item_id,
652 log.oid,
653 name.clone(),
654 CatalogItem::Log(Log {
655 variant: log.variant,
656 global_id,
657 }),
658 MZ_SYSTEM_ROLE_ID,
659 PrivilegeMap::from_mz_acl_items(acl_items),
660 );
661 }
662
663 Builtin::Table(table) => {
664 let mut acl_items = vec![rbac::owner_privilege(
665 mz_sql::catalog::ObjectType::Table,
666 MZ_SYSTEM_ROLE_ID,
667 )];
668 acl_items.extend_from_slice(&table.access);
669
670 self.insert_item(
671 item_id,
672 table.oid,
673 name.clone(),
674 CatalogItem::Table(Table {
675 create_sql: None,
676 desc: VersionedRelationDesc::new(table.desc.clone()),
677 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
678 conn_id: None,
679 resolved_ids: ResolvedIds::empty(),
680 custom_logical_compaction_window: table.is_retained_metrics_object.then(
681 || {
682 self.system_config()
683 .metrics_retention()
684 .try_into()
685 .expect("invalid metrics retention")
686 },
687 ),
688 is_retained_metrics_object: table.is_retained_metrics_object,
689 data_source: TableDataSource::TableWrites {
690 defaults: vec![Expr::null(); table.desc.arity()],
691 },
692 }),
693 MZ_SYSTEM_ROLE_ID,
694 PrivilegeMap::from_mz_acl_items(acl_items),
695 );
696 }
697 Builtin::Index(index) => {
698 let custom_logical_compaction_window =
699 index.is_retained_metrics_object.then(|| {
700 self.system_config()
701 .metrics_retention()
702 .try_into()
703 .expect("invalid metrics retention")
704 });
705 let versions = BTreeMap::new();
707
708 let item = self
709 .parse_item(
710 global_id,
711 &index.create_sql(),
712 &versions,
713 None,
714 index.is_retained_metrics_object,
715 custom_logical_compaction_window,
716 local_expression_cache,
717 None,
718 )
719 .unwrap_or_else(|e| {
720 panic!(
721 "internal error: failed to load bootstrap index:\n\
722 {}\n\
723 error:\n\
724 {:?}\n\n\
725 make sure that the schema name is specified in the builtin index's create sql statement.",
726 index.name, e
727 )
728 });
729 let CatalogItem::Index(_) = item else {
730 panic!(
731 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
732 index.name
733 );
734 };
735
736 self.insert_item(
737 item_id,
738 index.oid,
739 name,
740 item,
741 MZ_SYSTEM_ROLE_ID,
742 PrivilegeMap::default(),
743 );
744 }
745 Builtin::View(_) => {
746 unreachable!("views added elsewhere");
748 }
749
750 Builtin::Type(typ) => {
752 let typ = self.resolve_builtin_type_references(typ);
753 if let CatalogType::Array { element_reference } = typ.details.typ {
754 let entry = self.get_entry_mut(&element_reference);
755 let item_type = match &mut entry.item {
756 CatalogItem::Type(item_type) => item_type,
757 _ => unreachable!("types can only reference other types"),
758 };
759 item_type.details.array_id = Some(item_id);
760 }
761
762 let desc = None;
766 assert!(!matches!(typ.details.typ, CatalogType::Record { .. }));
767 let schema_id = self.resolve_system_schema(typ.schema);
768
769 self.insert_item(
770 item_id,
771 typ.oid,
772 QualifiedItemName {
773 qualifiers: ItemQualifiers {
774 database_spec: ResolvedDatabaseSpecifier::Ambient,
775 schema_spec: SchemaSpecifier::Id(schema_id),
776 },
777 item: typ.name.to_owned(),
778 },
779 CatalogItem::Type(Type {
780 create_sql: None,
781 global_id,
782 details: typ.details.clone(),
783 desc,
784 resolved_ids: ResolvedIds::empty(),
785 }),
786 MZ_SYSTEM_ROLE_ID,
787 PrivilegeMap::from_mz_acl_items(vec![
788 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
789 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
790 ]),
791 );
792 }
793
794 Builtin::Func(func) => {
795 let oid = INVALID_OID;
799 self.insert_item(
800 item_id,
801 oid,
802 name.clone(),
803 CatalogItem::Func(Func {
804 inner: func.inner,
805 global_id,
806 }),
807 MZ_SYSTEM_ROLE_ID,
808 PrivilegeMap::default(),
809 );
810 }
811
812 Builtin::Source(coll) => {
813 let mut acl_items = vec![rbac::owner_privilege(
814 mz_sql::catalog::ObjectType::Source,
815 MZ_SYSTEM_ROLE_ID,
816 )];
817 acl_items.extend_from_slice(&coll.access);
818
819 self.insert_item(
820 item_id,
821 coll.oid,
822 name.clone(),
823 CatalogItem::Source(Source {
824 create_sql: None,
825 data_source: DataSourceDesc::Introspection(coll.data_source),
826 desc: coll.desc.clone(),
827 global_id,
828 timeline: Timeline::EpochMilliseconds,
829 resolved_ids: ResolvedIds::empty(),
830 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
831 || {
832 self.system_config()
833 .metrics_retention()
834 .try_into()
835 .expect("invalid metrics retention")
836 },
837 ),
838 is_retained_metrics_object: coll.is_retained_metrics_object,
839 }),
840 MZ_SYSTEM_ROLE_ID,
841 PrivilegeMap::from_mz_acl_items(acl_items),
842 );
843 }
844 Builtin::ContinualTask(ct) => {
845 let mut acl_items = vec![rbac::owner_privilege(
846 mz_sql::catalog::ObjectType::Source,
847 MZ_SYSTEM_ROLE_ID,
848 )];
849 acl_items.extend_from_slice(&ct.access);
850 let versions = BTreeMap::new();
852
853 let item = self
854 .parse_item(
855 global_id,
856 &ct.create_sql(),
857 &versions,
858 None,
859 false,
860 None,
861 local_expression_cache,
862 None,
863 )
864 .unwrap_or_else(|e| {
865 panic!(
866 "internal error: failed to load bootstrap continual task:\n\
867 {}\n\
868 error:\n\
869 {:?}\n\n\
870 make sure that the schema name is specified in the builtin continual task's create sql statement.",
871 ct.name, e
872 )
873 });
874 let CatalogItem::ContinualTask(_) = &item else {
875 panic!(
876 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
877 ct.name
878 );
879 };
880
881 self.insert_item(
882 item_id,
883 ct.oid,
884 name,
885 item,
886 MZ_SYSTEM_ROLE_ID,
887 PrivilegeMap::from_mz_acl_items(acl_items),
888 );
889 }
890 Builtin::Connection(connection) => {
891 let versions = BTreeMap::new();
893 let mut item = self
894 .parse_item(
895 global_id,
896 connection.sql,
897 &versions,
898 None,
899 false,
900 None,
901 local_expression_cache,
902 None,
903 )
904 .unwrap_or_else(|e| {
905 panic!(
906 "internal error: failed to load bootstrap connection:\n\
907 {}\n\
908 error:\n\
909 {:?}\n\n\
910 make sure that the schema name is specified in the builtin connection's create sql statement.",
911 connection.name, e
912 )
913 });
914 let CatalogItem::Connection(_) = &mut item else {
915 panic!(
916 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
917 connection.name
918 );
919 };
920
921 let mut acl_items = vec![rbac::owner_privilege(
922 mz_sql::catalog::ObjectType::Connection,
923 connection.owner_id.clone(),
924 )];
925 acl_items.extend_from_slice(connection.access);
926
927 self.insert_item(
928 item_id,
929 connection.oid,
930 name.clone(),
931 item,
932 connection.owner_id.clone(),
933 PrivilegeMap::from_mz_acl_items(acl_items),
934 );
935 }
936 }
937 }
938
939 #[instrument(level = "debug")]
940 fn apply_temporary_item_update(
941 &mut self,
942 TemporaryItem {
943 id,
944 oid,
945 name,
946 item,
947 owner_id,
948 privileges,
949 }: TemporaryItem,
950 diff: StateDiff,
951 retractions: &mut InProgressRetractions,
952 ) {
953 match diff {
954 StateDiff::Addition => {
955 let entry = match retractions.temp_items.remove(&id) {
956 Some(mut retraction) => {
957 assert_eq!(retraction.id, id);
958 retraction.item = item;
959 retraction.id = id;
960 retraction.oid = oid;
961 retraction.name = name;
962 retraction.owner_id = owner_id;
963 retraction.privileges = privileges;
964 retraction
965 }
966 None => CatalogEntry {
967 item,
968 referenced_by: Vec::new(),
969 used_by: Vec::new(),
970 id,
971 oid,
972 name,
973 owner_id,
974 privileges,
975 },
976 };
977 self.insert_entry(entry);
978 }
979 StateDiff::Retraction => {
980 let entry = self.drop_item(id);
981 retractions.temp_items.insert(id, entry);
982 }
983 }
984 }
985
986 #[instrument(level = "debug")]
987 fn apply_item_update(
988 &mut self,
989 item: mz_catalog::durable::Item,
990 diff: StateDiff,
991 retractions: &mut InProgressRetractions,
992 local_expression_cache: &mut LocalExpressionCache,
993 ) -> Result<(), CatalogError> {
994 match diff {
995 StateDiff::Addition => {
996 let key = item.key();
997 let mz_catalog::durable::Item {
998 id,
999 oid,
1000 global_id,
1001 schema_id,
1002 name,
1003 create_sql,
1004 owner_id,
1005 privileges,
1006 extra_versions,
1007 } = item;
1008 let schema = self.find_non_temp_schema(&schema_id);
1009 let name = QualifiedItemName {
1010 qualifiers: ItemQualifiers {
1011 database_spec: schema.database().clone(),
1012 schema_spec: schema.id().clone(),
1013 },
1014 item: name.clone(),
1015 };
1016 let entry = match retractions.items.remove(&key) {
1017 Some(mut retraction) => {
1018 assert_eq!(retraction.id, item.id);
1019 if retraction.create_sql() != create_sql {
1024 let item = self
1025 .deserialize_item(
1026 global_id,
1027 &create_sql,
1028 &extra_versions,
1029 local_expression_cache,
1030 Some(retraction.item),
1031 )
1032 .unwrap_or_else(|e| {
1033 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1034 });
1035 retraction.item = item;
1036 }
1037 retraction.id = id;
1038 retraction.oid = oid;
1039 retraction.name = name;
1040 retraction.owner_id = owner_id;
1041 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1042
1043 retraction
1044 }
1045 None => {
1046 let catalog_item = self
1047 .deserialize_item(
1048 global_id,
1049 &create_sql,
1050 &extra_versions,
1051 local_expression_cache,
1052 None,
1053 )
1054 .unwrap_or_else(|e| {
1055 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1056 });
1057 CatalogEntry {
1058 item: catalog_item,
1059 referenced_by: Vec::new(),
1060 used_by: Vec::new(),
1061 id,
1062 oid,
1063 name,
1064 owner_id,
1065 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1066 }
1067 }
1068 };
1069
1070 self.insert_entry(entry);
1071 }
1072 StateDiff::Retraction => {
1073 let entry = self.drop_item(item.id);
1074 let key = item.into_key_value().0;
1075 retractions.items.insert(key, entry);
1076 }
1077 }
1078 Ok(())
1079 }
1080
1081 #[instrument(level = "debug")]
1082 fn apply_comment_update(
1083 &mut self,
1084 comment: mz_catalog::durable::Comment,
1085 diff: StateDiff,
1086 _retractions: &mut InProgressRetractions,
1087 ) {
1088 match diff {
1089 StateDiff::Addition => {
1090 let prev = self.comments.update_comment(
1091 comment.object_id,
1092 comment.sub_component,
1093 Some(comment.comment),
1094 );
1095 assert_eq!(
1096 prev, None,
1097 "values must be explicitly retracted before inserting a new value"
1098 );
1099 }
1100 StateDiff::Retraction => {
1101 let prev =
1102 self.comments
1103 .update_comment(comment.object_id, comment.sub_component, None);
1104 assert_eq!(
1105 prev,
1106 Some(comment.comment),
1107 "retraction does not match existing value: ({:?}, {:?})",
1108 comment.object_id,
1109 comment.sub_component,
1110 );
1111 }
1112 }
1113 }
1114
1115 #[instrument(level = "debug")]
1116 fn apply_source_references_update(
1117 &mut self,
1118 source_references: mz_catalog::durable::SourceReferences,
1119 diff: StateDiff,
1120 _retractions: &mut InProgressRetractions,
1121 ) {
1122 match diff {
1123 StateDiff::Addition => {
1124 let prev = self
1125 .source_references
1126 .insert(source_references.source_id, source_references.into());
1127 assert!(
1128 prev.is_none(),
1129 "values must be explicitly retracted before inserting a new value: {prev:?}"
1130 );
1131 }
1132 StateDiff::Retraction => {
1133 let prev = self.source_references.remove(&source_references.source_id);
1134 assert!(
1135 prev.is_some(),
1136 "retraction for a non-existent existing value: {source_references:?}"
1137 );
1138 }
1139 }
1140 }
1141
1142 #[instrument(level = "debug")]
1143 fn apply_storage_collection_metadata_update(
1144 &mut self,
1145 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1146 diff: StateDiff,
1147 _retractions: &mut InProgressRetractions,
1148 ) {
1149 apply_inverted_lookup(
1150 &mut self.storage_metadata.collection_metadata,
1151 &storage_collection_metadata.id,
1152 storage_collection_metadata.shard,
1153 diff,
1154 );
1155 }
1156
1157 #[instrument(level = "debug")]
1158 fn apply_unfinalized_shard_update(
1159 &mut self,
1160 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1161 diff: StateDiff,
1162 _retractions: &mut InProgressRetractions,
1163 ) {
1164 match diff {
1165 StateDiff::Addition => {
1166 let newly_inserted = self
1167 .storage_metadata
1168 .unfinalized_shards
1169 .insert(unfinalized_shard.shard);
1170 assert!(
1171 newly_inserted,
1172 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1173 );
1174 }
1175 StateDiff::Retraction => {
1176 let removed = self
1177 .storage_metadata
1178 .unfinalized_shards
1179 .remove(&unfinalized_shard.shard);
1180 assert!(
1181 removed,
1182 "retraction does not match existing value: {unfinalized_shard:?}"
1183 );
1184 }
1185 }
1186 }
1187
1188 #[instrument]
1191 pub(crate) fn generate_builtin_table_updates(
1192 &self,
1193 updates: Vec<StateUpdate>,
1194 ) -> Vec<BuiltinTableUpdate> {
1195 let mut builtin_table_updates = Vec::new();
1196 for StateUpdate { kind, ts: _, diff } in updates {
1197 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1198 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1199 builtin_table_updates.extend(builtin_table_update);
1200 }
1201 builtin_table_updates
1202 }
1203
1204 #[instrument(level = "debug")]
1207 pub(crate) fn generate_builtin_table_update(
1208 &self,
1209 kind: StateUpdateKind,
1210 diff: StateDiff,
1211 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1212 let diff = diff.into();
1213 match kind {
1214 StateUpdateKind::Role(role) => {
1215 let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1216 for group_id in role.membership.map.keys() {
1217 builtin_table_updates
1218 .push(self.pack_role_members_update(*group_id, role.id, diff))
1219 }
1220 builtin_table_updates
1221 }
1222 StateUpdateKind::Database(database) => {
1223 vec![self.pack_database_update(&database.id, diff)]
1224 }
1225 StateUpdateKind::Schema(schema) => {
1226 let db_spec = schema.database_id.into();
1227 vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1228 }
1229 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1230 vec![self.pack_default_privileges_update(
1231 &default_privilege.object,
1232 &default_privilege.acl_item.grantee,
1233 &default_privilege.acl_item.acl_mode,
1234 diff,
1235 )]
1236 }
1237 StateUpdateKind::SystemPrivilege(system_privilege) => {
1238 vec![self.pack_system_privileges_update(system_privilege, diff)]
1239 }
1240 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1241 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1242 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1243 self.pack_item_update(introspection_source_index.item_id, diff)
1244 }
1245 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1246 cluster_replica.cluster_id,
1247 &cluster_replica.name,
1248 diff,
1249 ),
1250 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1251 if !system_object_mapping.unique_identifier.runtime_alterable() {
1255 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1256 } else {
1257 vec![]
1258 }
1259 }
1260 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1261 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1262 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1263 comment.object_id,
1264 comment.sub_component,
1265 &comment.comment,
1266 diff,
1267 )],
1268 StateUpdateKind::SourceReferences(source_references) => {
1269 self.pack_source_references_update(&source_references, diff)
1270 }
1271 StateUpdateKind::AuditLog(audit_log) => {
1272 vec![
1273 self.pack_audit_log_update(&audit_log.event, diff)
1274 .expect("could not pack audit log update"),
1275 ]
1276 }
1277 StateUpdateKind::NetworkPolicy(policy) => self
1278 .pack_network_policy_update(&policy.id, diff)
1279 .expect("could not pack audit log update"),
1280 StateUpdateKind::StorageCollectionMetadata(_)
1281 | StateUpdateKind::UnfinalizedShard(_)
1282 | StateUpdateKind::RoleAuth(_) => Vec::new(),
1283 }
1284 }
1285
1286 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1287 self.entry_by_id
1288 .get_mut(id)
1289 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1290 }
1291
1292 fn get_schema_mut(
1293 &mut self,
1294 database_spec: &ResolvedDatabaseSpecifier,
1295 schema_spec: &SchemaSpecifier,
1296 conn_id: &ConnectionId,
1297 ) -> &mut Schema {
1298 match (database_spec, schema_spec) {
1300 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1301 .temporary_schemas
1302 .get_mut(conn_id)
1303 .expect("catalog out of sync"),
1304 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1305 .ambient_schemas_by_id
1306 .get_mut(id)
1307 .expect("catalog out of sync"),
1308 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1309 .database_by_id
1310 .get_mut(database_id)
1311 .expect("catalog out of sync")
1312 .schemas_by_id
1313 .get_mut(schema_id)
1314 .expect("catalog out of sync"),
1315 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1316 unreachable!("temporary schemas are in the ambient database")
1317 }
1318 }
1319 }
1320
1321 #[instrument(name = "catalog::parse_views")]
1331 async fn parse_builtin_views(
1332 state: &mut CatalogState,
1333 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1334 retractions: &mut InProgressRetractions,
1335 local_expression_cache: &mut LocalExpressionCache,
1336 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1337 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1338 let (updates, additions): (Vec<_>, Vec<_>) =
1339 builtin_views
1340 .into_iter()
1341 .partition_map(|(view, item_id, gid)| {
1342 match retractions.system_object_mappings.remove(&item_id) {
1343 Some(entry) => Either::Left(entry),
1344 None => Either::Right((view, item_id, gid)),
1345 }
1346 });
1347
1348 for entry in updates {
1349 let item_id = entry.id();
1354 state.insert_entry(entry);
1355 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1356 }
1357
1358 let mut handles = Vec::new();
1359 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1360 BTreeMap::new();
1361 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1362 let mut awaiting_all = Vec::new();
1365 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1367 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1368
1369 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1371 .into_iter()
1372 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1373 .collect();
1374 let item_ids: Vec<_> = views.keys().copied().collect();
1375
1376 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1377 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1378 if handles.is_empty() && ready.is_empty() {
1379 ready.extend(awaiting_all.drain(..));
1381 }
1382
1383 if !ready.is_empty() {
1385 let spawn_state = Arc::new(state.clone());
1386 while let Some(id) = ready.pop_front() {
1387 let (view, global_id) = views.get(&id).expect("must exist");
1388 let global_id = *global_id;
1389 let create_sql = view.create_sql();
1390 let versions = BTreeMap::new();
1392
1393 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1394 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1395 let task_state = Arc::clone(&spawn_state);
1396 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1397 let handle = mz_ore::task::spawn(
1398 || "parse view",
1399 async move {
1400 let res = task_state.parse_item_inner(
1401 global_id,
1402 &create_sql,
1403 &versions,
1404 None,
1405 false,
1406 None,
1407 cached_expr,
1408 None,
1409 );
1410 (id, global_id, res)
1411 }
1412 .instrument(span),
1413 );
1414 handles.push(handle);
1415 }
1416 }
1417
1418 let (handle, _idx, remaining) = future::select_all(handles).await;
1420 handles = remaining;
1421 let (id, global_id, res) = handle.expect("must join");
1422 let mut insert_cached_expr = |cached_expr| {
1423 if let Some(cached_expr) = cached_expr {
1424 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1425 }
1426 };
1427 match res {
1428 Ok((item, uncached_expr)) => {
1429 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1430 local_expression_cache.insert_uncached_expression(
1431 global_id,
1432 uncached_expr,
1433 optimizer_features,
1434 );
1435 }
1436 let (view, _gid) = views.remove(&id).expect("must exist");
1438 let schema_id = state
1439 .ambient_schemas_by_name
1440 .get(view.schema)
1441 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1442 let qname = QualifiedItemName {
1443 qualifiers: ItemQualifiers {
1444 database_spec: ResolvedDatabaseSpecifier::Ambient,
1445 schema_spec: SchemaSpecifier::Id(*schema_id),
1446 },
1447 item: view.name.into(),
1448 };
1449 let mut acl_items = vec![rbac::owner_privilege(
1450 mz_sql::catalog::ObjectType::View,
1451 MZ_SYSTEM_ROLE_ID,
1452 )];
1453 acl_items.extend_from_slice(&view.access);
1454
1455 state.insert_item(
1456 id,
1457 view.oid,
1458 qname,
1459 item,
1460 MZ_SYSTEM_ROLE_ID,
1461 PrivilegeMap::from_mz_acl_items(acl_items),
1462 );
1463
1464 let mut resolved_dependent_items = Vec::new();
1466 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1467 resolved_dependent_items.extend(dependent_items);
1468 }
1469 let entry = state.get_entry(&id);
1470 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1471 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1472 resolved_dependent_items.extend(dependent_items);
1473 }
1474 ready.extend(resolved_dependent_items);
1475
1476 completed_ids.insert(id);
1477 completed_names.insert(full_name);
1478 }
1479 Err((
1481 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1482 cached_expr,
1483 )) => {
1484 insert_cached_expr(cached_expr);
1485 if completed_ids.contains(&missing_dep) {
1486 ready.push_back(id);
1487 } else {
1488 awaiting_id_dependencies
1489 .entry(missing_dep)
1490 .or_default()
1491 .push(id);
1492 }
1493 }
1494 Err((
1496 AdapterError::PlanError(plan::PlanError::Catalog(
1497 SqlCatalogError::UnknownItem(missing_dep),
1498 )),
1499 cached_expr,
1500 )) => {
1501 insert_cached_expr(cached_expr);
1502 match CatalogItemId::from_str(&missing_dep) {
1503 Ok(missing_dep) => {
1504 if completed_ids.contains(&missing_dep) {
1505 ready.push_back(id);
1506 } else {
1507 awaiting_id_dependencies
1508 .entry(missing_dep)
1509 .or_default()
1510 .push(id);
1511 }
1512 }
1513 Err(_) => {
1514 if completed_names.contains(&missing_dep) {
1515 ready.push_back(id);
1516 } else {
1517 awaiting_name_dependencies
1518 .entry(missing_dep)
1519 .or_default()
1520 .push(id);
1521 }
1522 }
1523 }
1524 }
1525 Err((
1526 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1527 cached_expr,
1528 )) => {
1529 insert_cached_expr(cached_expr);
1530 awaiting_all.push(id);
1531 }
1532 Err((e, _)) => {
1533 let (bad_view, _gid) = views.get(&id).expect("must exist");
1534 panic!(
1535 "internal error: failed to load bootstrap view:\n\
1536 {name}\n\
1537 error:\n\
1538 {e:?}\n\n\
1539 Make sure that the schema name is specified in the builtin view's create sql statement.
1540 ",
1541 name = bad_view.name,
1542 )
1543 }
1544 }
1545 }
1546
1547 assert!(awaiting_id_dependencies.is_empty());
1548 assert!(
1549 awaiting_name_dependencies.is_empty(),
1550 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1551 );
1552 assert!(awaiting_all.is_empty());
1553 assert!(views.is_empty());
1554
1555 builtin_table_updates.extend(
1557 item_ids
1558 .into_iter()
1559 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1560 );
1561
1562 builtin_table_updates
1563 }
1564
1565 fn insert_entry(&mut self, entry: CatalogEntry) {
1567 if !entry.id.is_system() {
1568 if let Some(cluster_id) = entry.item.cluster_id() {
1569 self.clusters_by_id
1570 .get_mut(&cluster_id)
1571 .expect("catalog out of sync")
1572 .bound_objects
1573 .insert(entry.id);
1574 };
1575 }
1576
1577 for u in entry.references().items() {
1578 match self.entry_by_id.get_mut(u) {
1579 Some(metadata) => metadata.referenced_by.push(entry.id()),
1580 None => panic!(
1581 "Catalog: missing dependent catalog item {} while installing {}",
1582 &u,
1583 self.resolve_full_name(entry.name(), entry.conn_id())
1584 ),
1585 }
1586 }
1587 for u in entry.uses() {
1588 if u == entry.id() {
1591 continue;
1592 }
1593 match self.entry_by_id.get_mut(&u) {
1594 Some(metadata) => metadata.used_by.push(entry.id()),
1595 None => panic!(
1596 "Catalog: missing dependent catalog item {} while installing {}",
1597 &u,
1598 self.resolve_full_name(entry.name(), entry.conn_id())
1599 ),
1600 }
1601 }
1602 for gid in entry.item.global_ids() {
1603 self.entry_by_global_id.insert(gid, entry.id());
1604 }
1605 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1606 let schema = self.get_schema_mut(
1607 &entry.name().qualifiers.database_spec,
1608 &entry.name().qualifiers.schema_spec,
1609 conn_id,
1610 );
1611
1612 let prev_id = match entry.item() {
1613 CatalogItem::Func(_) => schema
1614 .functions
1615 .insert(entry.name().item.clone(), entry.id()),
1616 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1617 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1618 };
1619
1620 assert!(
1621 prev_id.is_none(),
1622 "builtin name collision on {:?}",
1623 entry.name().item.clone()
1624 );
1625
1626 self.entry_by_id.insert(entry.id(), entry.clone());
1627 }
1628
1629 fn insert_item(
1631 &mut self,
1632 id: CatalogItemId,
1633 oid: u32,
1634 name: QualifiedItemName,
1635 item: CatalogItem,
1636 owner_id: RoleId,
1637 privileges: PrivilegeMap,
1638 ) {
1639 let entry = CatalogEntry {
1640 item,
1641 name,
1642 id,
1643 oid,
1644 used_by: Vec::new(),
1645 referenced_by: Vec::new(),
1646 owner_id,
1647 privileges,
1648 };
1649
1650 self.insert_entry(entry);
1651 }
1652
1653 #[mz_ore::instrument(level = "trace")]
1654 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1655 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1656 for u in metadata.references().items() {
1657 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1658 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1659 }
1660 }
1661 for u in metadata.uses() {
1662 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1663 dep_metadata.used_by.retain(|u| *u != metadata.id())
1664 }
1665 }
1666 for gid in metadata.global_ids() {
1667 self.entry_by_global_id.remove(&gid);
1668 }
1669
1670 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1671 let schema = self.get_schema_mut(
1672 &metadata.name().qualifiers.database_spec,
1673 &metadata.name().qualifiers.schema_spec,
1674 conn_id,
1675 );
1676 if metadata.item_type() == CatalogItemType::Type {
1677 schema
1678 .types
1679 .remove(&metadata.name().item)
1680 .expect("catalog out of sync");
1681 } else {
1682 assert_ne!(metadata.item_type(), CatalogItemType::Func);
1685
1686 schema
1687 .items
1688 .remove(&metadata.name().item)
1689 .expect("catalog out of sync");
1690 };
1691
1692 if !id.is_system() {
1693 if let Some(cluster_id) = metadata.item().cluster_id() {
1694 assert!(
1695 self.clusters_by_id
1696 .get_mut(&cluster_id)
1697 .expect("catalog out of sync")
1698 .bound_objects
1699 .remove(&id),
1700 "catalog out of sync"
1701 );
1702 }
1703 }
1704
1705 metadata
1706 }
1707
1708 fn insert_introspection_source_index(
1709 &mut self,
1710 cluster_id: ClusterId,
1711 log: &'static BuiltinLog,
1712 item_id: CatalogItemId,
1713 global_id: GlobalId,
1714 oid: u32,
1715 ) {
1716 let (index_name, index) =
1717 self.create_introspection_source_index(cluster_id, log, global_id);
1718 self.insert_item(
1719 item_id,
1720 oid,
1721 index_name,
1722 index,
1723 MZ_SYSTEM_ROLE_ID,
1724 PrivilegeMap::default(),
1725 );
1726 }
1727
1728 fn create_introspection_source_index(
1729 &self,
1730 cluster_id: ClusterId,
1731 log: &'static BuiltinLog,
1732 global_id: GlobalId,
1733 ) -> (QualifiedItemName, CatalogItem) {
1734 let source_name = FullItemName {
1735 database: RawDatabaseSpecifier::Ambient,
1736 schema: log.schema.into(),
1737 item: log.name.into(),
1738 };
1739 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1740 let mut index_name = QualifiedItemName {
1741 qualifiers: ItemQualifiers {
1742 database_spec: ResolvedDatabaseSpecifier::Ambient,
1743 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1744 },
1745 item: index_name.clone(),
1746 };
1747 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1748 let index_item_name = index_name.item.clone();
1749 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1750 let index = CatalogItem::Index(Index {
1751 global_id,
1752 on: log_global_id,
1753 keys: log
1754 .variant
1755 .index_by()
1756 .into_iter()
1757 .map(MirScalarExpr::column)
1758 .collect(),
1759 create_sql: index_sql(
1760 index_item_name,
1761 cluster_id,
1762 source_name,
1763 &log.variant.desc(),
1764 &log.variant.index_by(),
1765 ),
1766 conn_id: None,
1767 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1768 cluster_id,
1769 is_retained_metrics_object: false,
1770 custom_logical_compaction_window: None,
1771 });
1772 (index_name, index)
1773 }
1774
1775 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1780 Ok(self.system_configuration.set(name, value)?)
1781 }
1782
1783 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1788 Ok(self.system_configuration.reset(name)?)
1789 }
1790}
1791
1792fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1794 let mut sorted_updates = Vec::with_capacity(updates.len());
1795
1796 updates.sort_by_key(|update| update.ts);
1797 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
1798 let sorted_ts_updates = sort_updates_inner(updates.collect());
1799 sorted_updates.extend(sorted_ts_updates);
1800 }
1801
1802 sorted_updates
1803}
1804
1805fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1807 fn push_update<T>(
1808 update: T,
1809 diff: StateDiff,
1810 retractions: &mut Vec<T>,
1811 additions: &mut Vec<T>,
1812 ) {
1813 match diff {
1814 StateDiff::Retraction => retractions.push(update),
1815 StateDiff::Addition => additions.push(update),
1816 }
1817 }
1818
1819 soft_assert_no_log!(
1820 updates.iter().map(|update| update.ts).all_equal(),
1821 "all timestamps should be equal: {updates:?}"
1822 );
1823
1824 let mut pre_cluster_retractions = Vec::new();
1826 let mut pre_cluster_additions = Vec::new();
1827 let mut cluster_retractions = Vec::new();
1828 let mut cluster_additions = Vec::new();
1829 let mut builtin_item_updates = Vec::new();
1830 let mut item_retractions = Vec::new();
1831 let mut item_additions = Vec::new();
1832 let mut temp_item_retractions = Vec::new();
1833 let mut temp_item_additions = Vec::new();
1834 let mut post_item_retractions = Vec::new();
1835 let mut post_item_additions = Vec::new();
1836 for update in updates {
1837 let diff = update.diff.clone();
1838 match update.kind {
1839 StateUpdateKind::Role(_)
1840 | StateUpdateKind::RoleAuth(_)
1841 | StateUpdateKind::Database(_)
1842 | StateUpdateKind::Schema(_)
1843 | StateUpdateKind::DefaultPrivilege(_)
1844 | StateUpdateKind::SystemPrivilege(_)
1845 | StateUpdateKind::SystemConfiguration(_)
1846 | StateUpdateKind::NetworkPolicy(_) => push_update(
1847 update,
1848 diff,
1849 &mut pre_cluster_retractions,
1850 &mut pre_cluster_additions,
1851 ),
1852 StateUpdateKind::Cluster(_)
1853 | StateUpdateKind::IntrospectionSourceIndex(_)
1854 | StateUpdateKind::ClusterReplica(_) => push_update(
1855 update,
1856 diff,
1857 &mut cluster_retractions,
1858 &mut cluster_additions,
1859 ),
1860 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1861 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1862 }
1863 StateUpdateKind::TemporaryItem(item) => push_update(
1864 (item, update.ts, update.diff),
1865 diff,
1866 &mut temp_item_retractions,
1867 &mut temp_item_additions,
1868 ),
1869 StateUpdateKind::Item(item) => push_update(
1870 (item, update.ts, update.diff),
1871 diff,
1872 &mut item_retractions,
1873 &mut item_additions,
1874 ),
1875 StateUpdateKind::Comment(_)
1876 | StateUpdateKind::SourceReferences(_)
1877 | StateUpdateKind::AuditLog(_)
1878 | StateUpdateKind::StorageCollectionMetadata(_)
1879 | StateUpdateKind::UnfinalizedShard(_) => push_update(
1880 update,
1881 diff,
1882 &mut post_item_retractions,
1883 &mut post_item_additions,
1884 ),
1885 }
1886 }
1887
1888 let builtin_item_updates = builtin_item_updates
1890 .into_iter()
1891 .map(|(system_object_mapping, ts, diff)| {
1892 let idx = BUILTIN_LOOKUP
1893 .get(&system_object_mapping.description)
1894 .expect("missing builtin")
1895 .0;
1896 (idx, system_object_mapping, ts, diff)
1897 })
1898 .sorted_by_key(|(idx, _, _, _)| *idx)
1899 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
1900
1901 let mut other_builtin_retractions = Vec::new();
1903 let mut other_builtin_additions = Vec::new();
1904 let mut builtin_index_retractions = Vec::new();
1905 let mut builtin_index_additions = Vec::new();
1906 for (builtin_item_update, ts, diff) in builtin_item_updates {
1907 match &builtin_item_update.description.object_type {
1908 CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
1909 StateUpdate {
1910 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1911 ts,
1912 diff,
1913 },
1914 diff,
1915 &mut builtin_index_retractions,
1916 &mut builtin_index_additions,
1917 ),
1918 CatalogItemType::Table
1919 | CatalogItemType::Source
1920 | CatalogItemType::Sink
1921 | CatalogItemType::View
1922 | CatalogItemType::MaterializedView
1923 | CatalogItemType::Type
1924 | CatalogItemType::Func
1925 | CatalogItemType::Secret
1926 | CatalogItemType::Connection => push_update(
1927 StateUpdate {
1928 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1929 ts,
1930 diff,
1931 },
1932 diff,
1933 &mut other_builtin_retractions,
1934 &mut other_builtin_additions,
1935 ),
1936 }
1937 }
1938
1939 fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
1946 let mut topo: BTreeMap<
1947 (mz_catalog::durable::Item, Timestamp, StateDiff),
1948 BTreeSet<CatalogItemId>,
1949 > = BTreeMap::default();
1950 let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
1951
1952 tracing::debug!(?connections, "sorting connections");
1954 for (connection, ts, diff) in connections.drain(..) {
1955 let statement = mz_sql::parse::parse(&connection.create_sql)
1956 .expect("valid CONNECTION create_sql")
1957 .into_element()
1958 .ast;
1959 let mut dependencies = mz_sql::names::dependencies(&statement)
1960 .expect("failed to find dependencies of CONNECTION");
1961 dependencies.remove(&connection.id);
1963 dependencies.retain(|dep| existing_connections.contains(dep));
1966
1967 assert_none!(topo.insert((connection, ts, diff), dependencies));
1969 }
1970 tracing::debug!(?topo, ?existing_connections, "built topological sort");
1971
1972 while !topo.is_empty() {
1974 let no_deps: Vec<_> = topo
1976 .iter()
1977 .filter_map(|(item, deps)| {
1978 if deps.is_empty() {
1979 Some(item.clone())
1980 } else {
1981 None
1982 }
1983 })
1984 .collect();
1985
1986 if no_deps.is_empty() {
1988 panic!("programming error, cycle in Connections");
1989 }
1990
1991 for item in no_deps {
1993 topo.remove(&item);
1995 topo.values_mut().for_each(|deps| {
1997 deps.remove(&item.0.id);
1998 });
1999 connections.push(item);
2001 }
2002 }
2003 }
2004
2005 fn sort_item_updates(
2021 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2022 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2023 let mut types = Vec::new();
2026 let mut funcs = Vec::new();
2029 let mut secrets = Vec::new();
2030 let mut connections = Vec::new();
2031 let mut sources = Vec::new();
2032 let mut tables = Vec::new();
2033 let mut derived_items = Vec::new();
2034 let mut sinks = Vec::new();
2035 let mut continual_tasks = Vec::new();
2036
2037 for update in item_updates {
2038 match update.0.item_type() {
2039 CatalogItemType::Type => types.push(update),
2040 CatalogItemType::Func => funcs.push(update),
2041 CatalogItemType::Secret => secrets.push(update),
2042 CatalogItemType::Connection => connections.push(update),
2043 CatalogItemType::Source => sources.push(update),
2044 CatalogItemType::Table => tables.push(update),
2045 CatalogItemType::View
2046 | CatalogItemType::MaterializedView
2047 | CatalogItemType::Index => derived_items.push(update),
2048 CatalogItemType::Sink => sinks.push(update),
2049 CatalogItemType::ContinualTask => continual_tasks.push(update),
2050 }
2051 }
2052
2053 for group in [
2055 &mut types,
2056 &mut funcs,
2057 &mut secrets,
2058 &mut sources,
2059 &mut tables,
2060 &mut derived_items,
2061 &mut sinks,
2062 &mut continual_tasks,
2063 ] {
2064 group.sort_by_key(|(item, _, _)| item.id);
2065 }
2066
2067 sort_connections(&mut connections);
2071
2072 iter::empty()
2073 .chain(types)
2074 .chain(funcs)
2075 .chain(secrets)
2076 .chain(connections)
2077 .chain(sources)
2078 .chain(tables)
2079 .chain(derived_items)
2080 .chain(sinks)
2081 .chain(continual_tasks)
2082 .collect()
2083 }
2084
2085 let item_retractions = sort_item_updates(item_retractions);
2086 let item_additions = sort_item_updates(item_additions);
2087
2088 fn sort_temp_item_updates(
2092 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2093 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2094 let mut types = Vec::new();
2097 let mut funcs = Vec::new();
2099 let mut secrets = Vec::new();
2100 let mut connections = Vec::new();
2101 let mut sources = Vec::new();
2102 let mut tables = Vec::new();
2103 let mut derived_items = Vec::new();
2104 let mut sinks = Vec::new();
2105 let mut continual_tasks = Vec::new();
2106
2107 for update in temp_item_updates {
2108 match update.0.item.typ() {
2109 CatalogItemType::Type => types.push(update),
2110 CatalogItemType::Func => funcs.push(update),
2111 CatalogItemType::Secret => secrets.push(update),
2112 CatalogItemType::Connection => connections.push(update),
2113 CatalogItemType::Source => sources.push(update),
2114 CatalogItemType::Table => tables.push(update),
2115 CatalogItemType::View
2116 | CatalogItemType::MaterializedView
2117 | CatalogItemType::Index => derived_items.push(update),
2118 CatalogItemType::Sink => sinks.push(update),
2119 CatalogItemType::ContinualTask => continual_tasks.push(update),
2120 }
2121 }
2122
2123 for group in [
2125 &mut types,
2126 &mut funcs,
2127 &mut secrets,
2128 &mut connections,
2129 &mut sources,
2130 &mut tables,
2131 &mut derived_items,
2132 &mut sinks,
2133 &mut continual_tasks,
2134 ] {
2135 group.sort_by_key(|(item, _, _)| item.id);
2136 }
2137
2138 iter::empty()
2139 .chain(types)
2140 .chain(funcs)
2141 .chain(secrets)
2142 .chain(connections)
2143 .chain(sources)
2144 .chain(tables)
2145 .chain(derived_items)
2146 .chain(sinks)
2147 .chain(continual_tasks)
2148 .collect()
2149 }
2150 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2151 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2152
2153 fn merge_item_updates(
2155 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2156 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2157 ) -> Vec<StateUpdate> {
2158 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2159
2160 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2161 (item_updates.front(), temp_item_updates.front())
2162 {
2163 if item.id < temp_item.id {
2164 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2165 state_updates.push(StateUpdate {
2166 kind: StateUpdateKind::Item(item),
2167 ts,
2168 diff,
2169 });
2170 } else if item.id > temp_item.id {
2171 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2172 state_updates.push(StateUpdate {
2173 kind: StateUpdateKind::TemporaryItem(temp_item),
2174 ts,
2175 diff,
2176 });
2177 } else {
2178 unreachable!(
2179 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2180 );
2181 }
2182 }
2183
2184 while let Some((item, ts, diff)) = item_updates.pop_front() {
2185 state_updates.push(StateUpdate {
2186 kind: StateUpdateKind::Item(item),
2187 ts,
2188 diff,
2189 });
2190 }
2191
2192 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2193 state_updates.push(StateUpdate {
2194 kind: StateUpdateKind::TemporaryItem(temp_item),
2195 ts,
2196 diff,
2197 });
2198 }
2199
2200 state_updates
2201 }
2202 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2203 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2204
2205 iter::empty()
2207 .chain(post_item_retractions.into_iter().rev())
2209 .chain(item_retractions.into_iter().rev())
2210 .chain(builtin_index_retractions.into_iter().rev())
2211 .chain(cluster_retractions.into_iter().rev())
2212 .chain(other_builtin_retractions.into_iter().rev())
2213 .chain(pre_cluster_retractions.into_iter().rev())
2214 .chain(pre_cluster_additions)
2215 .chain(other_builtin_additions)
2216 .chain(cluster_additions)
2217 .chain(builtin_index_additions)
2218 .chain(item_additions)
2219 .chain(post_item_additions)
2220 .collect()
2221}
2222
2223enum BootstrapApplyState {
2227 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2229 Items(Vec<StateUpdate>),
2231 Updates(Vec<StateUpdate>),
2233}
2234
2235impl BootstrapApplyState {
2236 fn new(update: StateUpdate) -> BootstrapApplyState {
2237 match update {
2238 StateUpdate {
2239 kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2240 diff: StateDiff::Addition,
2241 ..
2242 } if matches!(
2243 system_object_mapping.description.object_type,
2244 CatalogItemType::View
2245 ) =>
2246 {
2247 let view_addition = lookup_builtin_view_addition(system_object_mapping);
2248 BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2249 }
2250 StateUpdate {
2251 kind: StateUpdateKind::IntrospectionSourceIndex(_),
2252 ..
2253 }
2254 | StateUpdate {
2255 kind: StateUpdateKind::SystemObjectMapping(_),
2256 ..
2257 }
2258 | StateUpdate {
2259 kind: StateUpdateKind::Item(_),
2260 ..
2261 } => BootstrapApplyState::Items(vec![update]),
2262 update => BootstrapApplyState::Updates(vec![update]),
2263 }
2264 }
2265
2266 async fn apply(
2272 self,
2273 state: &mut CatalogState,
2274 retractions: &mut InProgressRetractions,
2275 local_expression_cache: &mut LocalExpressionCache,
2276 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2277 match self {
2278 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2279 let restore = state.system_configuration.clone();
2280 state.system_configuration.enable_for_item_parsing();
2281 let builtin_table_updates = CatalogState::parse_builtin_views(
2282 state,
2283 builtin_view_additions,
2284 retractions,
2285 local_expression_cache,
2286 )
2287 .await;
2288 state.system_configuration = restore;
2289 builtin_table_updates
2290 }
2291 BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2292 state
2293 .apply_updates_inner(updates, retractions, local_expression_cache)
2294 .expect("corrupt catalog")
2295 }),
2296 BootstrapApplyState::Updates(updates) => state
2297 .apply_updates_inner(updates, retractions, local_expression_cache)
2298 .expect("corrupt catalog"),
2299 }
2300 }
2301
2302 async fn step(
2303 self,
2304 next: BootstrapApplyState,
2305 state: &mut CatalogState,
2306 retractions: &mut InProgressRetractions,
2307 local_expression_cache: &mut LocalExpressionCache,
2308 ) -> (
2309 BootstrapApplyState,
2310 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2311 ) {
2312 match (self, next) {
2313 (
2314 BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2315 BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2316 ) => {
2317 builtin_view_additions.extend(next_builtin_view_additions);
2319 (
2320 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2321 Vec::new(),
2322 )
2323 }
2324 (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2325 updates.extend(next_updates);
2327 (BootstrapApplyState::Items(updates), Vec::new())
2328 }
2329 (
2330 BootstrapApplyState::Updates(mut updates),
2331 BootstrapApplyState::Updates(next_updates),
2332 ) => {
2333 updates.extend(next_updates);
2335 (BootstrapApplyState::Updates(updates), Vec::new())
2336 }
2337 (apply_state, next_apply_state) => {
2338 let builtin_table_update = apply_state
2340 .apply(state, retractions, local_expression_cache)
2341 .await;
2342 (next_apply_state, builtin_table_update)
2343 }
2344 }
2345 }
2346}
2347
2348fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2353where
2354 K: Ord + Clone + Debug,
2355 V: PartialEq + Debug,
2356{
2357 match diff {
2358 StateDiff::Retraction => {
2359 let prev = map.remove(key);
2360 assert_eq!(
2361 prev,
2362 Some(value),
2363 "retraction does not match existing value: {key:?}"
2364 );
2365 }
2366 StateDiff::Addition => {
2367 let prev = map.insert(key.clone(), value);
2368 assert_eq!(
2369 prev, None,
2370 "values must be explicitly retracted before inserting a new value: {key:?}"
2371 );
2372 }
2373 }
2374}
2375
2376fn apply_with_update<K, V, D>(
2379 map: &mut BTreeMap<K, V>,
2380 durable: D,
2381 key_fn: impl FnOnce(&D) -> K,
2382 diff: StateDiff,
2383 retractions: &mut BTreeMap<D::Key, V>,
2384) where
2385 K: Ord,
2386 V: UpdateFrom<D> + PartialEq + Debug,
2387 D: DurableType,
2388 D::Key: Ord,
2389{
2390 match diff {
2391 StateDiff::Retraction => {
2392 let mem_key = key_fn(&durable);
2393 let value = map
2394 .remove(&mem_key)
2395 .expect("retraction does not match existing value: {key:?}");
2396 let durable_key = durable.into_key_value().0;
2397 retractions.insert(durable_key, value);
2398 }
2399 StateDiff::Addition => {
2400 let mem_key = key_fn(&durable);
2401 let durable_key = durable.key();
2402 let value = match retractions.remove(&durable_key) {
2403 Some(mut retraction) => {
2404 retraction.update_from(durable);
2405 retraction
2406 }
2407 None => durable.into(),
2408 };
2409 let prev = map.insert(mem_key, value);
2410 assert_eq!(
2411 prev, None,
2412 "values must be explicitly retracted before inserting a new value"
2413 );
2414 }
2415 }
2416}
2417
2418fn lookup_builtin_view_addition(
2420 mapping: SystemObjectMapping,
2421) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2422 let (_, builtin) = BUILTIN_LOOKUP
2423 .get(&mapping.description)
2424 .expect("missing builtin view");
2425 let Builtin::View(view) = builtin else {
2426 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2427 };
2428
2429 (
2430 view,
2431 mapping.unique_identifier.catalog_id,
2432 mapping.unique_identifier.global_id,
2433 )
2434}