1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::future;
20use itertools::{Either, Itertools};
21use mz_adapter_types::connection::ConnectionId;
22use mz_catalog::SYSTEM_CONN_ID;
23use mz_catalog::builtin::{
24 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
25};
26use mz_catalog::durable::objects::{
27 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
28 SchemaKey,
29};
30use mz_catalog::durable::{CatalogError, SystemObjectMapping};
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
34 NetworkPolicy, Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
35 TableDataSource, TemporaryItem, Type, UpdateFrom,
36};
37use mz_compute_types::config::ComputeReplicaConfig;
38use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
39use mz_controller_types::ClusterId;
40use mz_expr::MirScalarExpr;
41use mz_ore::collections::CollectionExt;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_ore::{assert_none, instrument, soft_assert_no_log};
44use mz_pgrepr::oid::INVALID_OID;
45use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
48use mz_sql::catalog::CatalogError as SqlCatalogError;
49use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
50use mz_sql::names::{
51 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
52 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
53};
54use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
55use mz_sql::session::vars::{VarError, VarInput};
56use mz_sql::{plan, rbac};
57use mz_sql_parser::ast::Expr;
58use mz_storage_types::sources::Timeline;
59use tracing::{Instrument, info_span, warn};
60
61use crate::AdapterError;
62use crate::catalog::state::LocalExpressionCache;
63use crate::catalog::{BuiltinTableUpdate, CatalogState};
64use crate::util::index_sql;
65
66#[derive(Debug, Clone, Default)]
78struct InProgressRetractions {
79 roles: BTreeMap<RoleKey, Role>,
80 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
81 databases: BTreeMap<DatabaseKey, Database>,
82 schemas: BTreeMap<SchemaKey, Schema>,
83 clusters: BTreeMap<ClusterKey, Cluster>,
84 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
85 items: BTreeMap<ItemKey, CatalogEntry>,
86 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
87 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
88 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
89}
90
91impl CatalogState {
92 #[must_use]
99 #[instrument]
100 pub(crate) async fn apply_updates_for_bootstrap(
101 &mut self,
102 updates: Vec<StateUpdate>,
103 local_expression_cache: &mut LocalExpressionCache,
104 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
105 let mut builtin_table_updates = Vec::with_capacity(updates.len());
106 let updates = sort_updates(updates);
107
108 let mut groups: Vec<Vec<_>> = Vec::new();
109 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
110 groups.push(updates.collect());
111 }
112 for updates in groups {
113 let mut apply_state = BootstrapApplyState::Updates(Vec::new());
114 let mut retractions = InProgressRetractions::default();
115
116 for update in updates {
117 let next_apply_state = BootstrapApplyState::new(update);
118 let (next_apply_state, builtin_table_update) = apply_state
119 .step(
120 next_apply_state,
121 self,
122 &mut retractions,
123 local_expression_cache,
124 )
125 .await;
126 apply_state = next_apply_state;
127 builtin_table_updates.extend(builtin_table_update);
128 }
129
130 let builtin_table_update = apply_state
132 .apply(self, &mut retractions, local_expression_cache)
133 .await;
134 builtin_table_updates.extend(builtin_table_update);
135 }
136 builtin_table_updates
137 }
138
139 #[instrument]
143 pub(crate) fn apply_updates(
144 &mut self,
145 updates: Vec<StateUpdate>,
146 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
147 let mut builtin_table_updates = Vec::with_capacity(updates.len());
148 let updates = sort_updates(updates);
149
150 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
151 let mut retractions = InProgressRetractions::default();
152 let builtin_table_update = self.apply_updates_inner(
153 updates.collect(),
154 &mut retractions,
155 &mut LocalExpressionCache::Closed,
156 )?;
157 builtin_table_updates.extend(builtin_table_update);
158 }
159
160 Ok(builtin_table_updates)
161 }
162
163 #[instrument(level = "debug")]
164 fn apply_updates_inner(
165 &mut self,
166 updates: Vec<StateUpdate>,
167 retractions: &mut InProgressRetractions,
168 local_expression_cache: &mut LocalExpressionCache,
169 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
170 soft_assert_no_log!(
171 updates.iter().map(|update| update.ts).all_equal(),
172 "all timestamps should be equal: {updates:?}"
173 );
174
175 let mut update_system_config = false;
176
177 let mut builtin_table_updates = Vec::with_capacity(updates.len());
178 for StateUpdate { kind, ts: _, diff } in updates {
179 if matches!(kind, StateUpdateKind::SystemConfiguration(_)) {
180 update_system_config = true;
181 }
182
183 match diff {
184 StateDiff::Retraction => {
185 builtin_table_updates
188 .extend(self.generate_builtin_table_update(kind.clone(), diff));
189 self.apply_update(kind, diff, retractions, local_expression_cache)?;
190 }
191 StateDiff::Addition => {
192 self.apply_update(kind.clone(), diff, retractions, local_expression_cache)?;
193 builtin_table_updates
196 .extend(self.generate_builtin_table_update(kind.clone(), diff));
197 }
198 }
199 }
200
201 if update_system_config {
202 self.system_configuration.dyncfg_updates();
203 }
204
205 Ok(builtin_table_updates)
206 }
207
208 #[instrument(level = "debug")]
209 fn apply_update(
210 &mut self,
211 kind: StateUpdateKind,
212 diff: StateDiff,
213 retractions: &mut InProgressRetractions,
214 local_expression_cache: &mut LocalExpressionCache,
215 ) -> Result<(), CatalogError> {
216 match kind {
217 StateUpdateKind::Role(role) => {
218 self.apply_role_update(role, diff, retractions);
219 }
220 StateUpdateKind::RoleAuth(role_auth) => {
221 self.apply_role_auth_update(role_auth, diff, retractions);
222 }
223 StateUpdateKind::Database(database) => {
224 self.apply_database_update(database, diff, retractions);
225 }
226 StateUpdateKind::Schema(schema) => {
227 self.apply_schema_update(schema, diff, retractions);
228 }
229 StateUpdateKind::DefaultPrivilege(default_privilege) => {
230 self.apply_default_privilege_update(default_privilege, diff, retractions);
231 }
232 StateUpdateKind::SystemPrivilege(system_privilege) => {
233 self.apply_system_privilege_update(system_privilege, diff, retractions);
234 }
235 StateUpdateKind::SystemConfiguration(system_configuration) => {
236 self.apply_system_configuration_update(system_configuration, diff, retractions);
237 }
238 StateUpdateKind::Cluster(cluster) => {
239 self.apply_cluster_update(cluster, diff, retractions);
240 }
241 StateUpdateKind::NetworkPolicy(network_policy) => {
242 self.apply_network_policy_update(network_policy, diff, retractions);
243 }
244 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
245 self.apply_introspection_source_index_update(
246 introspection_source_index,
247 diff,
248 retractions,
249 );
250 }
251 StateUpdateKind::ClusterReplica(cluster_replica) => {
252 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
253 }
254 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
255 self.apply_system_object_mapping_update(
256 system_object_mapping,
257 diff,
258 retractions,
259 local_expression_cache,
260 );
261 }
262 StateUpdateKind::TemporaryItem(item) => {
263 self.apply_temporary_item_update(item, diff, retractions);
264 }
265 StateUpdateKind::Item(item) => {
266 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
267 }
268 StateUpdateKind::Comment(comment) => {
269 self.apply_comment_update(comment, diff, retractions);
270 }
271 StateUpdateKind::SourceReferences(source_reference) => {
272 self.apply_source_references_update(source_reference, diff, retractions);
273 }
274 StateUpdateKind::AuditLog(_audit_log) => {
275 }
277 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
278 self.apply_storage_collection_metadata_update(
279 storage_collection_metadata,
280 diff,
281 retractions,
282 );
283 }
284 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
285 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
286 }
287 }
288
289 Ok(())
290 }
291
292 #[instrument(level = "debug")]
293 fn apply_role_auth_update(
294 &mut self,
295 role_auth: mz_catalog::durable::RoleAuth,
296 diff: StateDiff,
297 retractions: &mut InProgressRetractions,
298 ) {
299 apply_with_update(
300 &mut self.role_auth_by_id,
301 role_auth,
302 |role_auth| role_auth.role_id,
303 diff,
304 &mut retractions.role_auths,
305 );
306 }
307
308 #[instrument(level = "debug")]
309 fn apply_role_update(
310 &mut self,
311 role: mz_catalog::durable::Role,
312 diff: StateDiff,
313 retractions: &mut InProgressRetractions,
314 ) {
315 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
316 apply_with_update(
317 &mut self.roles_by_id,
318 role,
319 |role| role.id,
320 diff,
321 &mut retractions.roles,
322 );
323 }
324
325 #[instrument(level = "debug")]
326 fn apply_database_update(
327 &mut self,
328 database: mz_catalog::durable::Database,
329 diff: StateDiff,
330 retractions: &mut InProgressRetractions,
331 ) {
332 apply_inverted_lookup(
333 &mut self.database_by_name,
334 &database.name,
335 database.id,
336 diff,
337 );
338 apply_with_update(
339 &mut self.database_by_id,
340 database,
341 |database| database.id,
342 diff,
343 &mut retractions.databases,
344 );
345 }
346
347 #[instrument(level = "debug")]
348 fn apply_schema_update(
349 &mut self,
350 schema: mz_catalog::durable::Schema,
351 diff: StateDiff,
352 retractions: &mut InProgressRetractions,
353 ) {
354 let (schemas_by_id, schemas_by_name) = match &schema.database_id {
355 Some(database_id) => {
356 let db = self
357 .database_by_id
358 .get_mut(database_id)
359 .expect("catalog out of sync");
360 (&mut db.schemas_by_id, &mut db.schemas_by_name)
361 }
362 None => (
363 &mut self.ambient_schemas_by_id,
364 &mut self.ambient_schemas_by_name,
365 ),
366 };
367 apply_inverted_lookup(schemas_by_name, &schema.name, schema.id, diff);
368 apply_with_update(
369 schemas_by_id,
370 schema,
371 |schema| schema.id,
372 diff,
373 &mut retractions.schemas,
374 );
375 }
376
377 #[instrument(level = "debug")]
378 fn apply_default_privilege_update(
379 &mut self,
380 default_privilege: mz_catalog::durable::DefaultPrivilege,
381 diff: StateDiff,
382 _retractions: &mut InProgressRetractions,
383 ) {
384 match diff {
385 StateDiff::Addition => self
386 .default_privileges
387 .grant(default_privilege.object, default_privilege.acl_item),
388 StateDiff::Retraction => self
389 .default_privileges
390 .revoke(&default_privilege.object, &default_privilege.acl_item),
391 }
392 }
393
394 #[instrument(level = "debug")]
395 fn apply_system_privilege_update(
396 &mut self,
397 system_privilege: MzAclItem,
398 diff: StateDiff,
399 _retractions: &mut InProgressRetractions,
400 ) {
401 match diff {
402 StateDiff::Addition => self.system_privileges.grant(system_privilege),
403 StateDiff::Retraction => self.system_privileges.revoke(&system_privilege),
404 }
405 }
406
407 #[instrument(level = "debug")]
408 fn apply_system_configuration_update(
409 &mut self,
410 system_configuration: mz_catalog::durable::SystemConfiguration,
411 diff: StateDiff,
412 _retractions: &mut InProgressRetractions,
413 ) {
414 let res = match diff {
415 StateDiff::Addition => self.insert_system_configuration(
416 &system_configuration.name,
417 VarInput::Flat(&system_configuration.value),
418 ),
419 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
420 };
421 match res {
422 Ok(_) => (),
423 Err(Error {
427 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
428 }) => {
429 warn!(%name, "unknown system parameter from catalog storage");
430 }
431 Err(e) => panic!("unable to update system variable: {e:?}"),
432 }
433 }
434
435 #[instrument(level = "debug")]
436 fn apply_cluster_update(
437 &mut self,
438 cluster: mz_catalog::durable::Cluster,
439 diff: StateDiff,
440 retractions: &mut InProgressRetractions,
441 ) {
442 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
443 apply_with_update(
444 &mut self.clusters_by_id,
445 cluster,
446 |cluster| cluster.id,
447 diff,
448 &mut retractions.clusters,
449 );
450 }
451
452 #[instrument(level = "debug")]
453 fn apply_network_policy_update(
454 &mut self,
455 policy: mz_catalog::durable::NetworkPolicy,
456 diff: StateDiff,
457 retractions: &mut InProgressRetractions,
458 ) {
459 apply_inverted_lookup(
460 &mut self.network_policies_by_name,
461 &policy.name,
462 policy.id,
463 diff,
464 );
465 apply_with_update(
466 &mut self.network_policies_by_id,
467 policy,
468 |policy| policy.id,
469 diff,
470 &mut retractions.network_policies,
471 );
472 }
473
474 #[instrument(level = "debug")]
475 fn apply_introspection_source_index_update(
476 &mut self,
477 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
478 diff: StateDiff,
479 retractions: &mut InProgressRetractions,
480 ) {
481 let cluster = self
482 .clusters_by_id
483 .get_mut(&introspection_source_index.cluster_id)
484 .expect("catalog out of sync");
485 let log = BUILTIN_LOG_LOOKUP
486 .get(introspection_source_index.name.as_str())
487 .expect("missing log");
488 apply_inverted_lookup(
489 &mut cluster.log_indexes,
490 &log.variant,
491 introspection_source_index.index_id,
492 diff,
493 );
494
495 match diff {
496 StateDiff::Addition => {
497 if let Some(mut entry) = retractions
498 .introspection_source_indexes
499 .remove(&introspection_source_index.item_id)
500 {
501 let (index_name, index) = self.create_introspection_source_index(
504 introspection_source_index.cluster_id,
505 log,
506 introspection_source_index.index_id,
507 );
508 assert_eq!(entry.id, introspection_source_index.item_id);
509 assert_eq!(entry.oid, introspection_source_index.oid);
510 assert_eq!(entry.name, index_name);
511 entry.item = index;
512 self.insert_entry(entry);
513 } else {
514 self.insert_introspection_source_index(
515 introspection_source_index.cluster_id,
516 log,
517 introspection_source_index.item_id,
518 introspection_source_index.index_id,
519 introspection_source_index.oid,
520 );
521 }
522 }
523 StateDiff::Retraction => {
524 let entry = self.drop_item(introspection_source_index.item_id);
525 retractions
526 .introspection_source_indexes
527 .insert(entry.id, entry);
528 }
529 }
530 }
531
532 #[instrument(level = "debug")]
533 fn apply_cluster_replica_update(
534 &mut self,
535 cluster_replica: mz_catalog::durable::ClusterReplica,
536 diff: StateDiff,
537 _retractions: &mut InProgressRetractions,
538 ) {
539 let cluster = self
540 .clusters_by_id
541 .get(&cluster_replica.cluster_id)
542 .expect("catalog out of sync");
543 let azs = cluster.availability_zones();
544 let location = self
545 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
546 .expect("catalog in unexpected state");
547 let cluster = self
548 .clusters_by_id
549 .get_mut(&cluster_replica.cluster_id)
550 .expect("catalog out of sync");
551 apply_inverted_lookup(
552 &mut cluster.replica_id_by_name_,
553 &cluster_replica.name,
554 cluster_replica.replica_id,
555 diff,
556 );
557 match diff {
558 StateDiff::Retraction => {
559 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
560 assert!(
561 prev.is_some(),
562 "retraction does not match existing value: {:?}",
563 cluster_replica.replica_id
564 );
565 }
566 StateDiff::Addition => {
567 let logging = ReplicaLogging {
568 log_logging: cluster_replica.config.logging.log_logging,
569 interval: cluster_replica.config.logging.interval,
570 };
571 let config = ReplicaConfig {
572 location,
573 compute: ComputeReplicaConfig { logging },
574 };
575 let mem_cluster_replica = ClusterReplica {
576 name: cluster_replica.name.clone(),
577 cluster_id: cluster_replica.cluster_id,
578 replica_id: cluster_replica.replica_id,
579 config,
580 owner_id: cluster_replica.owner_id,
581 };
582 let prev = cluster
583 .replicas_by_id_
584 .insert(cluster_replica.replica_id, mem_cluster_replica);
585 assert_eq!(
586 prev, None,
587 "values must be explicitly retracted before inserting a new value: {:?}",
588 cluster_replica.replica_id
589 );
590 }
591 }
592 }
593
594 #[instrument(level = "debug")]
595 fn apply_system_object_mapping_update(
596 &mut self,
597 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
598 diff: StateDiff,
599 retractions: &mut InProgressRetractions,
600 local_expression_cache: &mut LocalExpressionCache,
601 ) {
602 let item_id = system_object_mapping.unique_identifier.catalog_id;
603 let global_id = system_object_mapping.unique_identifier.global_id;
604
605 if system_object_mapping.unique_identifier.runtime_alterable() {
606 return;
610 }
611
612 if let StateDiff::Retraction = diff {
613 let entry = self.drop_item(item_id);
614 retractions.system_object_mappings.insert(item_id, entry);
615 return;
616 }
617
618 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
619 self.insert_entry(entry);
624 return;
625 }
626
627 let builtin = BUILTIN_LOOKUP
628 .get(&system_object_mapping.description)
629 .expect("missing builtin")
630 .1;
631 let schema_name = builtin.schema();
632 let schema_id = self
633 .ambient_schemas_by_name
634 .get(schema_name)
635 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
636 let name = QualifiedItemName {
637 qualifiers: ItemQualifiers {
638 database_spec: ResolvedDatabaseSpecifier::Ambient,
639 schema_spec: SchemaSpecifier::Id(*schema_id),
640 },
641 item: builtin.name().into(),
642 };
643 match builtin {
644 Builtin::Log(log) => {
645 let mut acl_items = vec![rbac::owner_privilege(
646 mz_sql::catalog::ObjectType::Source,
647 MZ_SYSTEM_ROLE_ID,
648 )];
649 acl_items.extend_from_slice(&log.access);
650 self.insert_item(
651 item_id,
652 log.oid,
653 name.clone(),
654 CatalogItem::Log(Log {
655 variant: log.variant,
656 global_id,
657 }),
658 MZ_SYSTEM_ROLE_ID,
659 PrivilegeMap::from_mz_acl_items(acl_items),
660 );
661 }
662
663 Builtin::Table(table) => {
664 let mut acl_items = vec![rbac::owner_privilege(
665 mz_sql::catalog::ObjectType::Table,
666 MZ_SYSTEM_ROLE_ID,
667 )];
668 acl_items.extend_from_slice(&table.access);
669
670 self.insert_item(
671 item_id,
672 table.oid,
673 name.clone(),
674 CatalogItem::Table(Table {
675 create_sql: None,
676 desc: VersionedRelationDesc::new(table.desc.clone()),
677 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
678 conn_id: None,
679 resolved_ids: ResolvedIds::empty(),
680 custom_logical_compaction_window: table.is_retained_metrics_object.then(
681 || {
682 self.system_config()
683 .metrics_retention()
684 .try_into()
685 .expect("invalid metrics retention")
686 },
687 ),
688 is_retained_metrics_object: table.is_retained_metrics_object,
689 data_source: TableDataSource::TableWrites {
690 defaults: vec![Expr::null(); table.desc.arity()],
691 },
692 }),
693 MZ_SYSTEM_ROLE_ID,
694 PrivilegeMap::from_mz_acl_items(acl_items),
695 );
696 }
697 Builtin::Index(index) => {
698 let custom_logical_compaction_window =
699 index.is_retained_metrics_object.then(|| {
700 self.system_config()
701 .metrics_retention()
702 .try_into()
703 .expect("invalid metrics retention")
704 });
705 let versions = BTreeMap::new();
707
708 let item = self
709 .parse_item(
710 global_id,
711 &index.create_sql(),
712 &versions,
713 None,
714 index.is_retained_metrics_object,
715 custom_logical_compaction_window,
716 local_expression_cache,
717 None,
718 )
719 .unwrap_or_else(|e| {
720 panic!(
721 "internal error: failed to load bootstrap index:\n\
722 {}\n\
723 error:\n\
724 {:?}\n\n\
725 make sure that the schema name is specified in the builtin index's create sql statement.",
726 index.name, e
727 )
728 });
729 let CatalogItem::Index(_) = item else {
730 panic!(
731 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
732 index.name
733 );
734 };
735
736 self.insert_item(
737 item_id,
738 index.oid,
739 name,
740 item,
741 MZ_SYSTEM_ROLE_ID,
742 PrivilegeMap::default(),
743 );
744 }
745 Builtin::View(_) => {
746 unreachable!("views added elsewhere");
748 }
749
750 Builtin::Type(typ) => {
752 let typ = self.resolve_builtin_type_references(typ);
753 if let CatalogType::Array { element_reference } = typ.details.typ {
754 let entry = self.get_entry_mut(&element_reference);
755 let item_type = match &mut entry.item {
756 CatalogItem::Type(item_type) => item_type,
757 _ => unreachable!("types can only reference other types"),
758 };
759 item_type.details.array_id = Some(item_id);
760 }
761
762 let desc = None;
766 assert!(!matches!(typ.details.typ, CatalogType::Record { .. }));
767 let schema_id = self.resolve_system_schema(typ.schema);
768
769 self.insert_item(
770 item_id,
771 typ.oid,
772 QualifiedItemName {
773 qualifiers: ItemQualifiers {
774 database_spec: ResolvedDatabaseSpecifier::Ambient,
775 schema_spec: SchemaSpecifier::Id(schema_id),
776 },
777 item: typ.name.to_owned(),
778 },
779 CatalogItem::Type(Type {
780 create_sql: None,
781 global_id,
782 details: typ.details.clone(),
783 desc,
784 resolved_ids: ResolvedIds::empty(),
785 }),
786 MZ_SYSTEM_ROLE_ID,
787 PrivilegeMap::from_mz_acl_items(vec![
788 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
789 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
790 ]),
791 );
792 }
793
794 Builtin::Func(func) => {
795 let oid = INVALID_OID;
799 self.insert_item(
800 item_id,
801 oid,
802 name.clone(),
803 CatalogItem::Func(Func {
804 inner: func.inner,
805 global_id,
806 }),
807 MZ_SYSTEM_ROLE_ID,
808 PrivilegeMap::default(),
809 );
810 }
811
812 Builtin::Source(coll) => {
813 let mut acl_items = vec![rbac::owner_privilege(
814 mz_sql::catalog::ObjectType::Source,
815 MZ_SYSTEM_ROLE_ID,
816 )];
817 acl_items.extend_from_slice(&coll.access);
818
819 self.insert_item(
820 item_id,
821 coll.oid,
822 name.clone(),
823 CatalogItem::Source(Source {
824 create_sql: None,
825 data_source: DataSourceDesc::Introspection(coll.data_source),
826 desc: coll.desc.clone(),
827 global_id,
828 timeline: Timeline::EpochMilliseconds,
829 resolved_ids: ResolvedIds::empty(),
830 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
831 || {
832 self.system_config()
833 .metrics_retention()
834 .try_into()
835 .expect("invalid metrics retention")
836 },
837 ),
838 is_retained_metrics_object: coll.is_retained_metrics_object,
839 }),
840 MZ_SYSTEM_ROLE_ID,
841 PrivilegeMap::from_mz_acl_items(acl_items),
842 );
843 }
844 Builtin::ContinualTask(ct) => {
845 let mut acl_items = vec![rbac::owner_privilege(
846 mz_sql::catalog::ObjectType::Source,
847 MZ_SYSTEM_ROLE_ID,
848 )];
849 acl_items.extend_from_slice(&ct.access);
850 let versions = BTreeMap::new();
852
853 let item = self
854 .parse_item(
855 global_id,
856 &ct.create_sql(),
857 &versions,
858 None,
859 false,
860 None,
861 local_expression_cache,
862 None,
863 )
864 .unwrap_or_else(|e| {
865 panic!(
866 "internal error: failed to load bootstrap continual task:\n\
867 {}\n\
868 error:\n\
869 {:?}\n\n\
870 make sure that the schema name is specified in the builtin continual task's create sql statement.",
871 ct.name, e
872 )
873 });
874 let CatalogItem::ContinualTask(_) = &item else {
875 panic!(
876 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
877 ct.name
878 );
879 };
880
881 self.insert_item(
882 item_id,
883 ct.oid,
884 name,
885 item,
886 MZ_SYSTEM_ROLE_ID,
887 PrivilegeMap::from_mz_acl_items(acl_items),
888 );
889 }
890 Builtin::Connection(connection) => {
891 let versions = BTreeMap::new();
893 let mut item = self
894 .parse_item(
895 global_id,
896 connection.sql,
897 &versions,
898 None,
899 false,
900 None,
901 local_expression_cache,
902 None,
903 )
904 .unwrap_or_else(|e| {
905 panic!(
906 "internal error: failed to load bootstrap connection:\n\
907 {}\n\
908 error:\n\
909 {:?}\n\n\
910 make sure that the schema name is specified in the builtin connection's create sql statement.",
911 connection.name, e
912 )
913 });
914 let CatalogItem::Connection(_) = &mut item else {
915 panic!(
916 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
917 connection.name
918 );
919 };
920
921 let mut acl_items = vec![rbac::owner_privilege(
922 mz_sql::catalog::ObjectType::Connection,
923 connection.owner_id.clone(),
924 )];
925 acl_items.extend_from_slice(connection.access);
926
927 self.insert_item(
928 item_id,
929 connection.oid,
930 name.clone(),
931 item,
932 connection.owner_id.clone(),
933 PrivilegeMap::from_mz_acl_items(acl_items),
934 );
935 }
936 }
937 }
938
939 #[instrument(level = "debug")]
940 fn apply_temporary_item_update(
941 &mut self,
942 TemporaryItem {
943 id,
944 oid,
945 name,
946 item,
947 owner_id,
948 privileges,
949 }: TemporaryItem,
950 diff: StateDiff,
951 retractions: &mut InProgressRetractions,
952 ) {
953 match diff {
954 StateDiff::Addition => {
955 let entry = match retractions.temp_items.remove(&id) {
956 Some(mut retraction) => {
957 assert_eq!(retraction.id, id);
958 retraction.item = item;
959 retraction.id = id;
960 retraction.oid = oid;
961 retraction.name = name;
962 retraction.owner_id = owner_id;
963 retraction.privileges = privileges;
964 retraction
965 }
966 None => CatalogEntry {
967 item,
968 referenced_by: Vec::new(),
969 used_by: Vec::new(),
970 id,
971 oid,
972 name,
973 owner_id,
974 privileges,
975 },
976 };
977 self.insert_entry(entry);
978 }
979 StateDiff::Retraction => {
980 let entry = self.drop_item(id);
981 retractions.temp_items.insert(id, entry);
982 }
983 }
984 }
985
986 #[instrument(level = "debug")]
987 fn apply_item_update(
988 &mut self,
989 item: mz_catalog::durable::Item,
990 diff: StateDiff,
991 retractions: &mut InProgressRetractions,
992 local_expression_cache: &mut LocalExpressionCache,
993 ) -> Result<(), CatalogError> {
994 match diff {
995 StateDiff::Addition => {
996 let key = item.key();
997 let mz_catalog::durable::Item {
998 id,
999 oid,
1000 global_id,
1001 schema_id,
1002 name,
1003 create_sql,
1004 owner_id,
1005 privileges,
1006 extra_versions,
1007 } = item;
1008 let schema = self.find_non_temp_schema(&schema_id);
1009 let name = QualifiedItemName {
1010 qualifiers: ItemQualifiers {
1011 database_spec: schema.database().clone(),
1012 schema_spec: schema.id().clone(),
1013 },
1014 item: name.clone(),
1015 };
1016 let entry = match retractions.items.remove(&key) {
1017 Some(mut retraction) => {
1018 assert_eq!(retraction.id, item.id);
1019 if retraction.create_sql() != create_sql {
1024 let item = self
1025 .deserialize_item(
1026 global_id,
1027 &create_sql,
1028 &extra_versions,
1029 local_expression_cache,
1030 Some(retraction.item),
1031 )
1032 .unwrap_or_else(|e| {
1033 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1034 });
1035 retraction.item = item;
1036 }
1037 retraction.id = id;
1038 retraction.oid = oid;
1039 retraction.name = name;
1040 retraction.owner_id = owner_id;
1041 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1042
1043 retraction
1044 }
1045 None => {
1046 let catalog_item = self
1047 .deserialize_item(
1048 global_id,
1049 &create_sql,
1050 &extra_versions,
1051 local_expression_cache,
1052 None,
1053 )
1054 .unwrap_or_else(|e| {
1055 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1056 });
1057 CatalogEntry {
1058 item: catalog_item,
1059 referenced_by: Vec::new(),
1060 used_by: Vec::new(),
1061 id,
1062 oid,
1063 name,
1064 owner_id,
1065 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1066 }
1067 }
1068 };
1069
1070 self.insert_entry(entry);
1071 }
1072 StateDiff::Retraction => {
1073 let entry = self.drop_item(item.id);
1074 let key = item.into_key_value().0;
1075 retractions.items.insert(key, entry);
1076 }
1077 }
1078 Ok(())
1079 }
1080
1081 #[instrument(level = "debug")]
1082 fn apply_comment_update(
1083 &mut self,
1084 comment: mz_catalog::durable::Comment,
1085 diff: StateDiff,
1086 _retractions: &mut InProgressRetractions,
1087 ) {
1088 match diff {
1089 StateDiff::Addition => {
1090 let prev = self.comments.update_comment(
1091 comment.object_id,
1092 comment.sub_component,
1093 Some(comment.comment),
1094 );
1095 assert_eq!(
1096 prev, None,
1097 "values must be explicitly retracted before inserting a new value"
1098 );
1099 }
1100 StateDiff::Retraction => {
1101 let prev =
1102 self.comments
1103 .update_comment(comment.object_id, comment.sub_component, None);
1104 assert_eq!(
1105 prev,
1106 Some(comment.comment),
1107 "retraction does not match existing value: ({:?}, {:?})",
1108 comment.object_id,
1109 comment.sub_component,
1110 );
1111 }
1112 }
1113 }
1114
1115 #[instrument(level = "debug")]
1116 fn apply_source_references_update(
1117 &mut self,
1118 source_references: mz_catalog::durable::SourceReferences,
1119 diff: StateDiff,
1120 _retractions: &mut InProgressRetractions,
1121 ) {
1122 match diff {
1123 StateDiff::Addition => {
1124 let prev = self
1125 .source_references
1126 .insert(source_references.source_id, source_references.into());
1127 assert!(
1128 prev.is_none(),
1129 "values must be explicitly retracted before inserting a new value: {prev:?}"
1130 );
1131 }
1132 StateDiff::Retraction => {
1133 let prev = self.source_references.remove(&source_references.source_id);
1134 assert!(
1135 prev.is_some(),
1136 "retraction for a non-existent existing value: {source_references:?}"
1137 );
1138 }
1139 }
1140 }
1141
1142 #[instrument(level = "debug")]
1143 fn apply_storage_collection_metadata_update(
1144 &mut self,
1145 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1146 diff: StateDiff,
1147 _retractions: &mut InProgressRetractions,
1148 ) {
1149 apply_inverted_lookup(
1150 &mut self.storage_metadata.collection_metadata,
1151 &storage_collection_metadata.id,
1152 storage_collection_metadata.shard,
1153 diff,
1154 );
1155 }
1156
1157 #[instrument(level = "debug")]
1158 fn apply_unfinalized_shard_update(
1159 &mut self,
1160 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1161 diff: StateDiff,
1162 _retractions: &mut InProgressRetractions,
1163 ) {
1164 match diff {
1165 StateDiff::Addition => {
1166 let newly_inserted = self
1167 .storage_metadata
1168 .unfinalized_shards
1169 .insert(unfinalized_shard.shard);
1170 assert!(
1171 newly_inserted,
1172 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1173 );
1174 }
1175 StateDiff::Retraction => {
1176 let removed = self
1177 .storage_metadata
1178 .unfinalized_shards
1179 .remove(&unfinalized_shard.shard);
1180 assert!(
1181 removed,
1182 "retraction does not match existing value: {unfinalized_shard:?}"
1183 );
1184 }
1185 }
1186 }
1187
1188 #[instrument]
1191 pub(crate) fn generate_builtin_table_updates(
1192 &self,
1193 updates: Vec<StateUpdate>,
1194 ) -> Vec<BuiltinTableUpdate> {
1195 let mut builtin_table_updates = Vec::new();
1196 for StateUpdate { kind, ts: _, diff } in updates {
1197 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1198 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1199 builtin_table_updates.extend(builtin_table_update);
1200 }
1201 builtin_table_updates
1202 }
1203
1204 #[instrument(level = "debug")]
1207 pub(crate) fn generate_builtin_table_update(
1208 &self,
1209 kind: StateUpdateKind,
1210 diff: StateDiff,
1211 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1212 let diff = diff.into();
1213 match kind {
1214 StateUpdateKind::Role(role) => {
1215 let mut builtin_table_updates = self.pack_role_update(role.id, diff);
1216 for group_id in role.membership.map.keys() {
1217 builtin_table_updates
1218 .push(self.pack_role_members_update(*group_id, role.id, diff))
1219 }
1220 builtin_table_updates
1221 }
1222 StateUpdateKind::RoleAuth(role_auth) => {
1223 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1224 }
1225 StateUpdateKind::Database(database) => {
1226 vec![self.pack_database_update(&database.id, diff)]
1227 }
1228 StateUpdateKind::Schema(schema) => {
1229 let db_spec = schema.database_id.into();
1230 vec![self.pack_schema_update(&db_spec, &schema.id, diff)]
1231 }
1232 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1233 vec![self.pack_default_privileges_update(
1234 &default_privilege.object,
1235 &default_privilege.acl_item.grantee,
1236 &default_privilege.acl_item.acl_mode,
1237 diff,
1238 )]
1239 }
1240 StateUpdateKind::SystemPrivilege(system_privilege) => {
1241 vec![self.pack_system_privileges_update(system_privilege, diff)]
1242 }
1243 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1244 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1245 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1246 self.pack_item_update(introspection_source_index.item_id, diff)
1247 }
1248 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1249 cluster_replica.cluster_id,
1250 &cluster_replica.name,
1251 diff,
1252 ),
1253 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1254 if !system_object_mapping.unique_identifier.runtime_alterable() {
1258 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1259 } else {
1260 vec![]
1261 }
1262 }
1263 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1264 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1265 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1266 comment.object_id,
1267 comment.sub_component,
1268 &comment.comment,
1269 diff,
1270 )],
1271 StateUpdateKind::SourceReferences(source_references) => {
1272 self.pack_source_references_update(&source_references, diff)
1273 }
1274 StateUpdateKind::AuditLog(audit_log) => {
1275 vec![
1276 self.pack_audit_log_update(&audit_log.event, diff)
1277 .expect("could not pack audit log update"),
1278 ]
1279 }
1280 StateUpdateKind::NetworkPolicy(policy) => self
1281 .pack_network_policy_update(&policy.id, diff)
1282 .expect("could not pack audit log update"),
1283 StateUpdateKind::StorageCollectionMetadata(_)
1284 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1285 }
1286 }
1287
1288 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1289 self.entry_by_id
1290 .get_mut(id)
1291 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1292 }
1293
1294 fn get_schema_mut(
1295 &mut self,
1296 database_spec: &ResolvedDatabaseSpecifier,
1297 schema_spec: &SchemaSpecifier,
1298 conn_id: &ConnectionId,
1299 ) -> &mut Schema {
1300 match (database_spec, schema_spec) {
1302 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1303 .temporary_schemas
1304 .get_mut(conn_id)
1305 .expect("catalog out of sync"),
1306 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1307 .ambient_schemas_by_id
1308 .get_mut(id)
1309 .expect("catalog out of sync"),
1310 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1311 .database_by_id
1312 .get_mut(database_id)
1313 .expect("catalog out of sync")
1314 .schemas_by_id
1315 .get_mut(schema_id)
1316 .expect("catalog out of sync"),
1317 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1318 unreachable!("temporary schemas are in the ambient database")
1319 }
1320 }
1321 }
1322
1323 #[instrument(name = "catalog::parse_views")]
1333 async fn parse_builtin_views(
1334 state: &mut CatalogState,
1335 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1336 retractions: &mut InProgressRetractions,
1337 local_expression_cache: &mut LocalExpressionCache,
1338 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1339 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1340 let (updates, additions): (Vec<_>, Vec<_>) =
1341 builtin_views
1342 .into_iter()
1343 .partition_map(|(view, item_id, gid)| {
1344 match retractions.system_object_mappings.remove(&item_id) {
1345 Some(entry) => Either::Left(entry),
1346 None => Either::Right((view, item_id, gid)),
1347 }
1348 });
1349
1350 for entry in updates {
1351 let item_id = entry.id();
1356 state.insert_entry(entry);
1357 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1358 }
1359
1360 let mut handles = Vec::new();
1361 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1362 BTreeMap::new();
1363 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1364 let mut awaiting_all = Vec::new();
1367 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1369 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1370
1371 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1373 .into_iter()
1374 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1375 .collect();
1376 let item_ids: Vec<_> = views.keys().copied().collect();
1377
1378 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1379 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1380 if handles.is_empty() && ready.is_empty() {
1381 ready.extend(awaiting_all.drain(..));
1383 }
1384
1385 if !ready.is_empty() {
1387 let spawn_state = Arc::new(state.clone());
1388 while let Some(id) = ready.pop_front() {
1389 let (view, global_id) = views.get(&id).expect("must exist");
1390 let global_id = *global_id;
1391 let create_sql = view.create_sql();
1392 let versions = BTreeMap::new();
1394
1395 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1396 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1397 let task_state = Arc::clone(&spawn_state);
1398 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1399 let handle = mz_ore::task::spawn(
1400 || "parse view",
1401 async move {
1402 let res = task_state.parse_item_inner(
1403 global_id,
1404 &create_sql,
1405 &versions,
1406 None,
1407 false,
1408 None,
1409 cached_expr,
1410 None,
1411 );
1412 (id, global_id, res)
1413 }
1414 .instrument(span),
1415 );
1416 handles.push(handle);
1417 }
1418 }
1419
1420 let (handle, _idx, remaining) = future::select_all(handles).await;
1422 handles = remaining;
1423 let (id, global_id, res) = handle.expect("must join");
1424 let mut insert_cached_expr = |cached_expr| {
1425 if let Some(cached_expr) = cached_expr {
1426 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1427 }
1428 };
1429 match res {
1430 Ok((item, uncached_expr)) => {
1431 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1432 local_expression_cache.insert_uncached_expression(
1433 global_id,
1434 uncached_expr,
1435 optimizer_features,
1436 );
1437 }
1438 let (view, _gid) = views.remove(&id).expect("must exist");
1440 let schema_id = state
1441 .ambient_schemas_by_name
1442 .get(view.schema)
1443 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1444 let qname = QualifiedItemName {
1445 qualifiers: ItemQualifiers {
1446 database_spec: ResolvedDatabaseSpecifier::Ambient,
1447 schema_spec: SchemaSpecifier::Id(*schema_id),
1448 },
1449 item: view.name.into(),
1450 };
1451 let mut acl_items = vec![rbac::owner_privilege(
1452 mz_sql::catalog::ObjectType::View,
1453 MZ_SYSTEM_ROLE_ID,
1454 )];
1455 acl_items.extend_from_slice(&view.access);
1456
1457 state.insert_item(
1458 id,
1459 view.oid,
1460 qname,
1461 item,
1462 MZ_SYSTEM_ROLE_ID,
1463 PrivilegeMap::from_mz_acl_items(acl_items),
1464 );
1465
1466 let mut resolved_dependent_items = Vec::new();
1468 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1469 resolved_dependent_items.extend(dependent_items);
1470 }
1471 let entry = state.get_entry(&id);
1472 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1473 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1474 resolved_dependent_items.extend(dependent_items);
1475 }
1476 ready.extend(resolved_dependent_items);
1477
1478 completed_ids.insert(id);
1479 completed_names.insert(full_name);
1480 }
1481 Err((
1483 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1484 cached_expr,
1485 )) => {
1486 insert_cached_expr(cached_expr);
1487 if completed_ids.contains(&missing_dep) {
1488 ready.push_back(id);
1489 } else {
1490 awaiting_id_dependencies
1491 .entry(missing_dep)
1492 .or_default()
1493 .push(id);
1494 }
1495 }
1496 Err((
1498 AdapterError::PlanError(plan::PlanError::Catalog(
1499 SqlCatalogError::UnknownItem(missing_dep),
1500 )),
1501 cached_expr,
1502 )) => {
1503 insert_cached_expr(cached_expr);
1504 match CatalogItemId::from_str(&missing_dep) {
1505 Ok(missing_dep) => {
1506 if completed_ids.contains(&missing_dep) {
1507 ready.push_back(id);
1508 } else {
1509 awaiting_id_dependencies
1510 .entry(missing_dep)
1511 .or_default()
1512 .push(id);
1513 }
1514 }
1515 Err(_) => {
1516 if completed_names.contains(&missing_dep) {
1517 ready.push_back(id);
1518 } else {
1519 awaiting_name_dependencies
1520 .entry(missing_dep)
1521 .or_default()
1522 .push(id);
1523 }
1524 }
1525 }
1526 }
1527 Err((
1528 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1529 cached_expr,
1530 )) => {
1531 insert_cached_expr(cached_expr);
1532 awaiting_all.push(id);
1533 }
1534 Err((e, _)) => {
1535 let (bad_view, _gid) = views.get(&id).expect("must exist");
1536 panic!(
1537 "internal error: failed to load bootstrap view:\n\
1538 {name}\n\
1539 error:\n\
1540 {e:?}\n\n\
1541 Make sure that the schema name is specified in the builtin view's create sql statement.
1542 ",
1543 name = bad_view.name,
1544 )
1545 }
1546 }
1547 }
1548
1549 assert!(awaiting_id_dependencies.is_empty());
1550 assert!(
1551 awaiting_name_dependencies.is_empty(),
1552 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1553 );
1554 assert!(awaiting_all.is_empty());
1555 assert!(views.is_empty());
1556
1557 builtin_table_updates.extend(
1559 item_ids
1560 .into_iter()
1561 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1562 );
1563
1564 builtin_table_updates
1565 }
1566
1567 fn insert_entry(&mut self, entry: CatalogEntry) {
1569 if !entry.id.is_system() {
1570 if let Some(cluster_id) = entry.item.cluster_id() {
1571 self.clusters_by_id
1572 .get_mut(&cluster_id)
1573 .expect("catalog out of sync")
1574 .bound_objects
1575 .insert(entry.id);
1576 };
1577 }
1578
1579 for u in entry.references().items() {
1580 match self.entry_by_id.get_mut(u) {
1581 Some(metadata) => metadata.referenced_by.push(entry.id()),
1582 None => panic!(
1583 "Catalog: missing dependent catalog item {} while installing {}",
1584 &u,
1585 self.resolve_full_name(entry.name(), entry.conn_id())
1586 ),
1587 }
1588 }
1589 for u in entry.uses() {
1590 if u == entry.id() {
1593 continue;
1594 }
1595 match self.entry_by_id.get_mut(&u) {
1596 Some(metadata) => metadata.used_by.push(entry.id()),
1597 None => panic!(
1598 "Catalog: missing dependent catalog item {} while installing {}",
1599 &u,
1600 self.resolve_full_name(entry.name(), entry.conn_id())
1601 ),
1602 }
1603 }
1604 for gid in entry.item.global_ids() {
1605 self.entry_by_global_id.insert(gid, entry.id());
1606 }
1607 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1608 let schema = self.get_schema_mut(
1609 &entry.name().qualifiers.database_spec,
1610 &entry.name().qualifiers.schema_spec,
1611 conn_id,
1612 );
1613
1614 let prev_id = match entry.item() {
1615 CatalogItem::Func(_) => schema
1616 .functions
1617 .insert(entry.name().item.clone(), entry.id()),
1618 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1619 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1620 };
1621
1622 assert!(
1623 prev_id.is_none(),
1624 "builtin name collision on {:?}",
1625 entry.name().item.clone()
1626 );
1627
1628 self.entry_by_id.insert(entry.id(), entry.clone());
1629 }
1630
1631 fn insert_item(
1633 &mut self,
1634 id: CatalogItemId,
1635 oid: u32,
1636 name: QualifiedItemName,
1637 item: CatalogItem,
1638 owner_id: RoleId,
1639 privileges: PrivilegeMap,
1640 ) {
1641 let entry = CatalogEntry {
1642 item,
1643 name,
1644 id,
1645 oid,
1646 used_by: Vec::new(),
1647 referenced_by: Vec::new(),
1648 owner_id,
1649 privileges,
1650 };
1651
1652 self.insert_entry(entry);
1653 }
1654
1655 #[mz_ore::instrument(level = "trace")]
1656 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
1657 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
1658 for u in metadata.references().items() {
1659 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
1660 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
1661 }
1662 }
1663 for u in metadata.uses() {
1664 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
1665 dep_metadata.used_by.retain(|u| *u != metadata.id())
1666 }
1667 }
1668 for gid in metadata.global_ids() {
1669 self.entry_by_global_id.remove(&gid);
1670 }
1671
1672 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1673 let schema = self.get_schema_mut(
1674 &metadata.name().qualifiers.database_spec,
1675 &metadata.name().qualifiers.schema_spec,
1676 conn_id,
1677 );
1678 if metadata.item_type() == CatalogItemType::Type {
1679 schema
1680 .types
1681 .remove(&metadata.name().item)
1682 .expect("catalog out of sync");
1683 } else {
1684 assert_ne!(metadata.item_type(), CatalogItemType::Func);
1687
1688 schema
1689 .items
1690 .remove(&metadata.name().item)
1691 .expect("catalog out of sync");
1692 };
1693
1694 if !id.is_system() {
1695 if let Some(cluster_id) = metadata.item().cluster_id() {
1696 assert!(
1697 self.clusters_by_id
1698 .get_mut(&cluster_id)
1699 .expect("catalog out of sync")
1700 .bound_objects
1701 .remove(&id),
1702 "catalog out of sync"
1703 );
1704 }
1705 }
1706
1707 metadata
1708 }
1709
1710 fn insert_introspection_source_index(
1711 &mut self,
1712 cluster_id: ClusterId,
1713 log: &'static BuiltinLog,
1714 item_id: CatalogItemId,
1715 global_id: GlobalId,
1716 oid: u32,
1717 ) {
1718 let (index_name, index) =
1719 self.create_introspection_source_index(cluster_id, log, global_id);
1720 self.insert_item(
1721 item_id,
1722 oid,
1723 index_name,
1724 index,
1725 MZ_SYSTEM_ROLE_ID,
1726 PrivilegeMap::default(),
1727 );
1728 }
1729
1730 fn create_introspection_source_index(
1731 &self,
1732 cluster_id: ClusterId,
1733 log: &'static BuiltinLog,
1734 global_id: GlobalId,
1735 ) -> (QualifiedItemName, CatalogItem) {
1736 let source_name = FullItemName {
1737 database: RawDatabaseSpecifier::Ambient,
1738 schema: log.schema.into(),
1739 item: log.name.into(),
1740 };
1741 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
1742 let mut index_name = QualifiedItemName {
1743 qualifiers: ItemQualifiers {
1744 database_spec: ResolvedDatabaseSpecifier::Ambient,
1745 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
1746 },
1747 item: index_name.clone(),
1748 };
1749 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
1750 let index_item_name = index_name.item.clone();
1751 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
1752 let index = CatalogItem::Index(Index {
1753 global_id,
1754 on: log_global_id,
1755 keys: log
1756 .variant
1757 .index_by()
1758 .into_iter()
1759 .map(MirScalarExpr::column)
1760 .collect(),
1761 create_sql: index_sql(
1762 index_item_name,
1763 cluster_id,
1764 source_name,
1765 &log.variant.desc(),
1766 &log.variant.index_by(),
1767 ),
1768 conn_id: None,
1769 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
1770 cluster_id,
1771 is_retained_metrics_object: false,
1772 custom_logical_compaction_window: None,
1773 });
1774 (index_name, index)
1775 }
1776
1777 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
1782 Ok(self.system_configuration.set(name, value)?)
1783 }
1784
1785 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
1790 Ok(self.system_configuration.reset(name)?)
1791 }
1792}
1793
1794fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1796 let mut sorted_updates = Vec::with_capacity(updates.len());
1797
1798 updates.sort_by_key(|update| update.ts);
1799 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
1800 let sorted_ts_updates = sort_updates_inner(updates.collect());
1801 sorted_updates.extend(sorted_ts_updates);
1802 }
1803
1804 sorted_updates
1805}
1806
1807fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1809 fn push_update<T>(
1810 update: T,
1811 diff: StateDiff,
1812 retractions: &mut Vec<T>,
1813 additions: &mut Vec<T>,
1814 ) {
1815 match diff {
1816 StateDiff::Retraction => retractions.push(update),
1817 StateDiff::Addition => additions.push(update),
1818 }
1819 }
1820
1821 soft_assert_no_log!(
1822 updates.iter().map(|update| update.ts).all_equal(),
1823 "all timestamps should be equal: {updates:?}"
1824 );
1825
1826 let mut pre_cluster_retractions = Vec::new();
1828 let mut pre_cluster_additions = Vec::new();
1829 let mut cluster_retractions = Vec::new();
1830 let mut cluster_additions = Vec::new();
1831 let mut builtin_item_updates = Vec::new();
1832 let mut item_retractions = Vec::new();
1833 let mut item_additions = Vec::new();
1834 let mut temp_item_retractions = Vec::new();
1835 let mut temp_item_additions = Vec::new();
1836 let mut post_item_retractions = Vec::new();
1837 let mut post_item_additions = Vec::new();
1838 for update in updates {
1839 let diff = update.diff.clone();
1840 match update.kind {
1841 StateUpdateKind::Role(_)
1842 | StateUpdateKind::RoleAuth(_)
1843 | StateUpdateKind::Database(_)
1844 | StateUpdateKind::Schema(_)
1845 | StateUpdateKind::DefaultPrivilege(_)
1846 | StateUpdateKind::SystemPrivilege(_)
1847 | StateUpdateKind::SystemConfiguration(_)
1848 | StateUpdateKind::NetworkPolicy(_) => push_update(
1849 update,
1850 diff,
1851 &mut pre_cluster_retractions,
1852 &mut pre_cluster_additions,
1853 ),
1854 StateUpdateKind::Cluster(_)
1855 | StateUpdateKind::IntrospectionSourceIndex(_)
1856 | StateUpdateKind::ClusterReplica(_) => push_update(
1857 update,
1858 diff,
1859 &mut cluster_retractions,
1860 &mut cluster_additions,
1861 ),
1862 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1863 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
1864 }
1865 StateUpdateKind::TemporaryItem(item) => push_update(
1866 (item, update.ts, update.diff),
1867 diff,
1868 &mut temp_item_retractions,
1869 &mut temp_item_additions,
1870 ),
1871 StateUpdateKind::Item(item) => push_update(
1872 (item, update.ts, update.diff),
1873 diff,
1874 &mut item_retractions,
1875 &mut item_additions,
1876 ),
1877 StateUpdateKind::Comment(_)
1878 | StateUpdateKind::SourceReferences(_)
1879 | StateUpdateKind::AuditLog(_)
1880 | StateUpdateKind::StorageCollectionMetadata(_)
1881 | StateUpdateKind::UnfinalizedShard(_) => push_update(
1882 update,
1883 diff,
1884 &mut post_item_retractions,
1885 &mut post_item_additions,
1886 ),
1887 }
1888 }
1889
1890 let builtin_item_updates = builtin_item_updates
1892 .into_iter()
1893 .map(|(system_object_mapping, ts, diff)| {
1894 let idx = BUILTIN_LOOKUP
1895 .get(&system_object_mapping.description)
1896 .expect("missing builtin")
1897 .0;
1898 (idx, system_object_mapping, ts, diff)
1899 })
1900 .sorted_by_key(|(idx, _, _, _)| *idx)
1901 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
1902
1903 let mut other_builtin_retractions = Vec::new();
1905 let mut other_builtin_additions = Vec::new();
1906 let mut builtin_index_retractions = Vec::new();
1907 let mut builtin_index_additions = Vec::new();
1908 for (builtin_item_update, ts, diff) in builtin_item_updates {
1909 match &builtin_item_update.description.object_type {
1910 CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
1911 StateUpdate {
1912 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1913 ts,
1914 diff,
1915 },
1916 diff,
1917 &mut builtin_index_retractions,
1918 &mut builtin_index_additions,
1919 ),
1920 CatalogItemType::Table
1921 | CatalogItemType::Source
1922 | CatalogItemType::Sink
1923 | CatalogItemType::View
1924 | CatalogItemType::MaterializedView
1925 | CatalogItemType::Type
1926 | CatalogItemType::Func
1927 | CatalogItemType::Secret
1928 | CatalogItemType::Connection => push_update(
1929 StateUpdate {
1930 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
1931 ts,
1932 diff,
1933 },
1934 diff,
1935 &mut other_builtin_retractions,
1936 &mut other_builtin_additions,
1937 ),
1938 }
1939 }
1940
1941 fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
1948 let mut topo: BTreeMap<
1949 (mz_catalog::durable::Item, Timestamp, StateDiff),
1950 BTreeSet<CatalogItemId>,
1951 > = BTreeMap::default();
1952 let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();
1953
1954 tracing::debug!(?connections, "sorting connections");
1956 for (connection, ts, diff) in connections.drain(..) {
1957 let statement = mz_sql::parse::parse(&connection.create_sql)
1958 .expect("valid CONNECTION create_sql")
1959 .into_element()
1960 .ast;
1961 let mut dependencies = mz_sql::names::dependencies(&statement)
1962 .expect("failed to find dependencies of CONNECTION");
1963 dependencies.remove(&connection.id);
1965 dependencies.retain(|dep| existing_connections.contains(dep));
1968
1969 assert_none!(topo.insert((connection, ts, diff), dependencies));
1971 }
1972 tracing::debug!(?topo, ?existing_connections, "built topological sort");
1973
1974 while !topo.is_empty() {
1976 let no_deps: Vec<_> = topo
1978 .iter()
1979 .filter_map(|(item, deps)| {
1980 if deps.is_empty() {
1981 Some(item.clone())
1982 } else {
1983 None
1984 }
1985 })
1986 .collect();
1987
1988 if no_deps.is_empty() {
1990 panic!("programming error, cycle in Connections");
1991 }
1992
1993 for item in no_deps {
1995 topo.remove(&item);
1997 topo.values_mut().for_each(|deps| {
1999 deps.remove(&item.0.id);
2000 });
2001 connections.push(item);
2003 }
2004 }
2005 }
2006
2007 fn sort_item_updates(
2023 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2024 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2025 let mut types = Vec::new();
2028 let mut funcs = Vec::new();
2031 let mut secrets = Vec::new();
2032 let mut connections = Vec::new();
2033 let mut sources = Vec::new();
2034 let mut tables = Vec::new();
2035 let mut derived_items = Vec::new();
2036 let mut sinks = Vec::new();
2037 let mut continual_tasks = Vec::new();
2038
2039 for update in item_updates {
2040 match update.0.item_type() {
2041 CatalogItemType::Type => types.push(update),
2042 CatalogItemType::Func => funcs.push(update),
2043 CatalogItemType::Secret => secrets.push(update),
2044 CatalogItemType::Connection => connections.push(update),
2045 CatalogItemType::Source => sources.push(update),
2046 CatalogItemType::Table => tables.push(update),
2047 CatalogItemType::View
2048 | CatalogItemType::MaterializedView
2049 | CatalogItemType::Index => derived_items.push(update),
2050 CatalogItemType::Sink => sinks.push(update),
2051 CatalogItemType::ContinualTask => continual_tasks.push(update),
2052 }
2053 }
2054
2055 for group in [
2057 &mut types,
2058 &mut funcs,
2059 &mut secrets,
2060 &mut sources,
2061 &mut tables,
2062 &mut derived_items,
2063 &mut sinks,
2064 &mut continual_tasks,
2065 ] {
2066 group.sort_by_key(|(item, _, _)| item.id);
2067 }
2068
2069 sort_connections(&mut connections);
2073
2074 iter::empty()
2075 .chain(types)
2076 .chain(funcs)
2077 .chain(secrets)
2078 .chain(connections)
2079 .chain(sources)
2080 .chain(tables)
2081 .chain(derived_items)
2082 .chain(sinks)
2083 .chain(continual_tasks)
2084 .collect()
2085 }
2086
2087 let item_retractions = sort_item_updates(item_retractions);
2088 let item_additions = sort_item_updates(item_additions);
2089
2090 fn sort_temp_item_updates(
2094 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2095 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2096 let mut types = Vec::new();
2099 let mut funcs = Vec::new();
2101 let mut secrets = Vec::new();
2102 let mut connections = Vec::new();
2103 let mut sources = Vec::new();
2104 let mut tables = Vec::new();
2105 let mut derived_items = Vec::new();
2106 let mut sinks = Vec::new();
2107 let mut continual_tasks = Vec::new();
2108
2109 for update in temp_item_updates {
2110 match update.0.item.typ() {
2111 CatalogItemType::Type => types.push(update),
2112 CatalogItemType::Func => funcs.push(update),
2113 CatalogItemType::Secret => secrets.push(update),
2114 CatalogItemType::Connection => connections.push(update),
2115 CatalogItemType::Source => sources.push(update),
2116 CatalogItemType::Table => tables.push(update),
2117 CatalogItemType::View
2118 | CatalogItemType::MaterializedView
2119 | CatalogItemType::Index => derived_items.push(update),
2120 CatalogItemType::Sink => sinks.push(update),
2121 CatalogItemType::ContinualTask => continual_tasks.push(update),
2122 }
2123 }
2124
2125 for group in [
2127 &mut types,
2128 &mut funcs,
2129 &mut secrets,
2130 &mut connections,
2131 &mut sources,
2132 &mut tables,
2133 &mut derived_items,
2134 &mut sinks,
2135 &mut continual_tasks,
2136 ] {
2137 group.sort_by_key(|(item, _, _)| item.id);
2138 }
2139
2140 iter::empty()
2141 .chain(types)
2142 .chain(funcs)
2143 .chain(secrets)
2144 .chain(connections)
2145 .chain(sources)
2146 .chain(tables)
2147 .chain(derived_items)
2148 .chain(sinks)
2149 .chain(continual_tasks)
2150 .collect()
2151 }
2152 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2153 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2154
2155 fn merge_item_updates(
2157 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2158 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2159 ) -> Vec<StateUpdate> {
2160 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2161
2162 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2163 (item_updates.front(), temp_item_updates.front())
2164 {
2165 if item.id < temp_item.id {
2166 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2167 state_updates.push(StateUpdate {
2168 kind: StateUpdateKind::Item(item),
2169 ts,
2170 diff,
2171 });
2172 } else if item.id > temp_item.id {
2173 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2174 state_updates.push(StateUpdate {
2175 kind: StateUpdateKind::TemporaryItem(temp_item),
2176 ts,
2177 diff,
2178 });
2179 } else {
2180 unreachable!(
2181 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2182 );
2183 }
2184 }
2185
2186 while let Some((item, ts, diff)) = item_updates.pop_front() {
2187 state_updates.push(StateUpdate {
2188 kind: StateUpdateKind::Item(item),
2189 ts,
2190 diff,
2191 });
2192 }
2193
2194 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2195 state_updates.push(StateUpdate {
2196 kind: StateUpdateKind::TemporaryItem(temp_item),
2197 ts,
2198 diff,
2199 });
2200 }
2201
2202 state_updates
2203 }
2204 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2205 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2206
2207 iter::empty()
2209 .chain(post_item_retractions.into_iter().rev())
2211 .chain(item_retractions.into_iter().rev())
2212 .chain(builtin_index_retractions.into_iter().rev())
2213 .chain(cluster_retractions.into_iter().rev())
2214 .chain(other_builtin_retractions.into_iter().rev())
2215 .chain(pre_cluster_retractions.into_iter().rev())
2216 .chain(pre_cluster_additions)
2217 .chain(other_builtin_additions)
2218 .chain(cluster_additions)
2219 .chain(builtin_index_additions)
2220 .chain(item_additions)
2221 .chain(post_item_additions)
2222 .collect()
2223}
2224
2225enum BootstrapApplyState {
2229 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2231 Items(Vec<StateUpdate>),
2233 Updates(Vec<StateUpdate>),
2235}
2236
2237impl BootstrapApplyState {
2238 fn new(update: StateUpdate) -> BootstrapApplyState {
2239 match update {
2240 StateUpdate {
2241 kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2242 diff: StateDiff::Addition,
2243 ..
2244 } if matches!(
2245 system_object_mapping.description.object_type,
2246 CatalogItemType::View
2247 ) =>
2248 {
2249 let view_addition = lookup_builtin_view_addition(system_object_mapping);
2250 BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2251 }
2252 StateUpdate {
2253 kind: StateUpdateKind::IntrospectionSourceIndex(_),
2254 ..
2255 }
2256 | StateUpdate {
2257 kind: StateUpdateKind::SystemObjectMapping(_),
2258 ..
2259 }
2260 | StateUpdate {
2261 kind: StateUpdateKind::Item(_),
2262 ..
2263 } => BootstrapApplyState::Items(vec![update]),
2264 update => BootstrapApplyState::Updates(vec![update]),
2265 }
2266 }
2267
2268 async fn apply(
2274 self,
2275 state: &mut CatalogState,
2276 retractions: &mut InProgressRetractions,
2277 local_expression_cache: &mut LocalExpressionCache,
2278 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2279 match self {
2280 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2281 let restore = state.system_configuration.clone();
2282 state.system_configuration.enable_for_item_parsing();
2283 let builtin_table_updates = CatalogState::parse_builtin_views(
2284 state,
2285 builtin_view_additions,
2286 retractions,
2287 local_expression_cache,
2288 )
2289 .await;
2290 state.system_configuration = restore;
2291 builtin_table_updates
2292 }
2293 BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2294 state
2295 .apply_updates_inner(updates, retractions, local_expression_cache)
2296 .expect("corrupt catalog")
2297 }),
2298 BootstrapApplyState::Updates(updates) => state
2299 .apply_updates_inner(updates, retractions, local_expression_cache)
2300 .expect("corrupt catalog"),
2301 }
2302 }
2303
2304 async fn step(
2305 self,
2306 next: BootstrapApplyState,
2307 state: &mut CatalogState,
2308 retractions: &mut InProgressRetractions,
2309 local_expression_cache: &mut LocalExpressionCache,
2310 ) -> (
2311 BootstrapApplyState,
2312 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2313 ) {
2314 match (self, next) {
2315 (
2316 BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2317 BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2318 ) => {
2319 builtin_view_additions.extend(next_builtin_view_additions);
2321 (
2322 BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2323 Vec::new(),
2324 )
2325 }
2326 (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2327 updates.extend(next_updates);
2329 (BootstrapApplyState::Items(updates), Vec::new())
2330 }
2331 (
2332 BootstrapApplyState::Updates(mut updates),
2333 BootstrapApplyState::Updates(next_updates),
2334 ) => {
2335 updates.extend(next_updates);
2337 (BootstrapApplyState::Updates(updates), Vec::new())
2338 }
2339 (apply_state, next_apply_state) => {
2340 let builtin_table_update = apply_state
2342 .apply(state, retractions, local_expression_cache)
2343 .await;
2344 (next_apply_state, builtin_table_update)
2345 }
2346 }
2347 }
2348}
2349
2350fn apply_inverted_lookup<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: V, diff: StateDiff)
2355where
2356 K: Ord + Clone + Debug,
2357 V: PartialEq + Debug,
2358{
2359 match diff {
2360 StateDiff::Retraction => {
2361 let prev = map.remove(key);
2362 assert_eq!(
2363 prev,
2364 Some(value),
2365 "retraction does not match existing value: {key:?}"
2366 );
2367 }
2368 StateDiff::Addition => {
2369 let prev = map.insert(key.clone(), value);
2370 assert_eq!(
2371 prev, None,
2372 "values must be explicitly retracted before inserting a new value: {key:?}"
2373 );
2374 }
2375 }
2376}
2377
2378fn apply_with_update<K, V, D>(
2381 map: &mut BTreeMap<K, V>,
2382 durable: D,
2383 key_fn: impl FnOnce(&D) -> K,
2384 diff: StateDiff,
2385 retractions: &mut BTreeMap<D::Key, V>,
2386) where
2387 K: Ord,
2388 V: UpdateFrom<D> + PartialEq + Debug,
2389 D: DurableType,
2390 D::Key: Ord,
2391{
2392 match diff {
2393 StateDiff::Retraction => {
2394 let mem_key = key_fn(&durable);
2395 let value = map
2396 .remove(&mem_key)
2397 .expect("retraction does not match existing value: {key:?}");
2398 let durable_key = durable.into_key_value().0;
2399 retractions.insert(durable_key, value);
2400 }
2401 StateDiff::Addition => {
2402 let mem_key = key_fn(&durable);
2403 let durable_key = durable.key();
2404 let value = match retractions.remove(&durable_key) {
2405 Some(mut retraction) => {
2406 retraction.update_from(durable);
2407 retraction
2408 }
2409 None => durable.into(),
2410 };
2411 let prev = map.insert(mem_key, value);
2412 assert_eq!(
2413 prev, None,
2414 "values must be explicitly retracted before inserting a new value"
2415 );
2416 }
2417 }
2418}
2419
2420fn lookup_builtin_view_addition(
2422 mapping: SystemObjectMapping,
2423) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2424 let (_, builtin) = BUILTIN_LOOKUP
2425 .get(&mapping.description)
2426 .expect("missing builtin view");
2427 let Builtin::View(view) = builtin else {
2428 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2429 };
2430
2431 (
2432 view,
2433 mapping.unique_identifier.catalog_id,
2434 mapping.unique_identifier.global_id,
2435 )
2436}