1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29 SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35 Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36 TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_compute_types::dataflows::DataflowDescription;
40use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_ore::collections::CollectionExt;
44use mz_ore::tracing::OpenTelemetryContext;
45use mz_ore::{
46 instrument, soft_assert_eq_or_log, soft_assert_no_log, soft_assert_or_log, soft_panic_or_log,
47};
48use mz_pgrepr::oid::INVALID_OID;
49use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
50use mz_repr::role_id::RoleId;
51use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
52use mz_sql::catalog::CatalogError as SqlCatalogError;
53use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
54use mz_sql::names::{
55 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
56 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
57};
58use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
59use mz_sql::session::vars::{VarError, VarInput};
60use mz_sql::{plan, rbac};
61use mz_sql_parser::ast::Expr;
62use mz_storage_types::sources::Timeline;
63use mz_transform::dataflow::DataflowMetainfo;
64use mz_transform::notice::OptimizerNotice;
65use tracing::{info_span, warn};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{BuiltinTableUpdate, CatalogState};
70use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
71use crate::util::{index_sql, sort_topological};
72
73#[derive(Debug, Clone, Default)]
85struct InProgressRetractions {
86 roles: BTreeMap<RoleKey, Role>,
87 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
88 databases: BTreeMap<DatabaseKey, Database>,
89 schemas: BTreeMap<SchemaKey, Schema>,
90 clusters: BTreeMap<ClusterKey, Cluster>,
91 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
92 items: BTreeMap<ItemKey, CatalogEntry>,
93 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
94 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
95 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
96}
97
98impl CatalogState {
99 #[must_use]
103 #[instrument]
104 pub(crate) async fn apply_updates(
105 &mut self,
106 updates: Vec<StateUpdate>,
107 local_expression_cache: &mut LocalExpressionCache,
108 ) -> (
109 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
110 Vec<ParsedStateUpdate>,
111 ) {
112 let mut builtin_table_updates = Vec::with_capacity(updates.len());
113 let mut catalog_updates = Vec::with_capacity(updates.len());
114
115 let updates = Self::consolidate_updates(updates);
120
121 let mut groups: Vec<Vec<_>> = Vec::new();
123 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
124 let updates = sort_updates(updates.collect());
128 groups.push(updates);
129 }
130
131 for updates in groups {
132 let mut apply_state = ApplyState::Updates(Vec::new());
133 let mut retractions = InProgressRetractions::default();
134
135 for update in updates {
136 let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
137 .step(
138 ApplyState::new(update),
139 self,
140 &mut retractions,
141 local_expression_cache,
142 )
143 .await;
144 apply_state = next_apply_state;
145 builtin_table_updates.extend(builtin_table_update);
146 catalog_updates.extend(catalog_update);
147 }
148
149 let (builtin_table_update, catalog_update) = apply_state
151 .apply(self, &mut retractions, local_expression_cache)
152 .await;
153 builtin_table_updates.extend(builtin_table_update);
154 catalog_updates.extend(catalog_update);
155
156 let dropped_entries: Vec<CatalogEntry> = retractions
159 .items
160 .into_values()
161 .chain(retractions.temp_items.into_values())
162 .collect();
163 if !dropped_entries.is_empty() {
164 let dropped_notices = self.drop_optimizer_notices(dropped_entries);
165 if self.system_config().enable_mz_notices() {
166 self.pack_optimizer_notice_updates(
167 &mut builtin_table_updates,
168 dropped_notices.iter(),
169 Diff::MINUS_ONE,
170 );
171 }
172 }
173 }
174
175 (builtin_table_updates, catalog_updates)
176 }
177
178 fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
186 let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
187 .into_iter()
188 .map(|update| (update.kind, update.ts, update.diff.into()))
189 .collect_vec();
190
191 consolidate_updates(&mut updates);
192
193 updates
194 .into_iter()
195 .map(|(kind, ts, diff)| StateUpdate {
196 kind,
197 ts,
198 diff: diff
199 .try_into()
200 .expect("catalog state cannot have diff other than -1 or 1"),
201 })
202 .collect_vec()
203 }
204
205 #[instrument(level = "debug")]
206 fn apply_updates_inner(
207 &mut self,
208 updates: Vec<StateUpdate>,
209 retractions: &mut InProgressRetractions,
210 local_expression_cache: &mut LocalExpressionCache,
211 ) -> Result<
212 (
213 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
214 Vec<ParsedStateUpdate>,
215 ),
216 CatalogError,
217 > {
218 soft_assert_no_log!(
219 updates.iter().map(|update| update.ts).all_equal(),
220 "all timestamps should be equal: {updates:?}"
221 );
222
223 let mut update_system_config = false;
224
225 let mut builtin_table_updates = Vec::with_capacity(updates.len());
226 let mut catalog_updates = Vec::new();
227
228 for state_update in updates {
229 if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
230 update_system_config = true;
231 }
232
233 match state_update.diff {
234 StateDiff::Retraction => {
235 if let Some(update) =
239 parsed_state_updates::parse_state_update(self, state_update.clone())
240 {
241 catalog_updates.push(update);
242 }
243
244 builtin_table_updates.extend(self.generate_builtin_table_update(
247 state_update.kind.clone(),
248 state_update.diff,
249 ));
250 self.apply_update(
251 state_update.kind,
252 state_update.diff,
253 retractions,
254 local_expression_cache,
255 )?;
256 }
257 StateDiff::Addition => {
258 self.apply_update(
259 state_update.kind.clone(),
260 state_update.diff,
261 retractions,
262 local_expression_cache,
263 )?;
264 builtin_table_updates.extend(self.generate_builtin_table_update(
268 state_update.kind.clone(),
269 state_update.diff,
270 ));
271
272 if let Some(update) =
275 parsed_state_updates::parse_state_update(self, state_update.clone())
276 {
277 catalog_updates.push(update);
278 }
279 }
280 }
281 }
282
283 if update_system_config {
284 self.system_configuration.dyncfg_updates();
285 }
286
287 Ok((builtin_table_updates, catalog_updates))
288 }
289
290 #[instrument(level = "debug")]
291 fn apply_update(
292 &mut self,
293 kind: StateUpdateKind,
294 diff: StateDiff,
295 retractions: &mut InProgressRetractions,
296 local_expression_cache: &mut LocalExpressionCache,
297 ) -> Result<(), CatalogError> {
298 match kind {
299 StateUpdateKind::Role(role) => {
300 self.apply_role_update(role, diff, retractions);
301 }
302 StateUpdateKind::RoleAuth(role_auth) => {
303 self.apply_role_auth_update(role_auth, diff, retractions);
304 }
305 StateUpdateKind::Database(database) => {
306 self.apply_database_update(database, diff, retractions);
307 }
308 StateUpdateKind::Schema(schema) => {
309 self.apply_schema_update(schema, diff, retractions);
310 }
311 StateUpdateKind::DefaultPrivilege(default_privilege) => {
312 self.apply_default_privilege_update(default_privilege, diff, retractions);
313 }
314 StateUpdateKind::SystemPrivilege(system_privilege) => {
315 self.apply_system_privilege_update(system_privilege, diff, retractions);
316 }
317 StateUpdateKind::SystemConfiguration(system_configuration) => {
318 self.apply_system_configuration_update(system_configuration, diff, retractions);
319 }
320 StateUpdateKind::ClusterSystemConfiguration(cfg) => {
321 Self::apply_scoped_system_configuration_update(
322 &mut self.scoped_system_parameters.cluster,
323 cfg.cluster_id,
324 cfg.name,
325 cfg.value,
326 diff,
327 );
328 }
329 StateUpdateKind::ReplicaSystemConfiguration(cfg) => {
330 Self::apply_scoped_system_configuration_update(
331 &mut self.scoped_system_parameters.replica,
332 cfg.replica_id,
333 cfg.name,
334 cfg.value,
335 diff,
336 );
337 }
338 StateUpdateKind::Cluster(cluster) => {
339 self.apply_cluster_update(cluster, diff, retractions);
340 }
341 StateUpdateKind::NetworkPolicy(network_policy) => {
342 self.apply_network_policy_update(network_policy, diff, retractions);
343 }
344 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
345 self.apply_introspection_source_index_update(
346 introspection_source_index,
347 diff,
348 retractions,
349 );
350 }
351 StateUpdateKind::ClusterReplica(cluster_replica) => {
352 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
353 }
354 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
355 self.apply_system_object_mapping_update(
356 system_object_mapping,
357 diff,
358 retractions,
359 local_expression_cache,
360 );
361 }
362 StateUpdateKind::TemporaryItem(item) => {
363 self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
364 }
365 StateUpdateKind::Item(item) => {
366 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
367 }
368 StateUpdateKind::Comment(comment) => {
369 self.apply_comment_update(comment, diff, retractions);
370 }
371 StateUpdateKind::SourceReferences(source_reference) => {
372 self.apply_source_references_update(source_reference, diff, retractions);
373 }
374 StateUpdateKind::AuditLog(_audit_log) => {
375 }
377 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
378 self.apply_storage_collection_metadata_update(
379 storage_collection_metadata,
380 diff,
381 retractions,
382 );
383 }
384 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
385 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
386 }
387 }
388
389 Ok(())
390 }
391
392 #[instrument(level = "debug")]
393 fn apply_role_auth_update(
394 &mut self,
395 role_auth: mz_catalog::durable::RoleAuth,
396 diff: StateDiff,
397 retractions: &mut InProgressRetractions,
398 ) {
399 apply_with_update(
400 &mut self.role_auth_by_id,
401 role_auth,
402 |role_auth| role_auth.role_id,
403 diff,
404 &mut retractions.role_auths,
405 );
406 }
407
408 #[instrument(level = "debug")]
409 fn apply_role_update(
410 &mut self,
411 role: mz_catalog::durable::Role,
412 diff: StateDiff,
413 retractions: &mut InProgressRetractions,
414 ) {
415 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
416 apply_with_update(
417 &mut self.roles_by_id,
418 role,
419 |role| role.id,
420 diff,
421 &mut retractions.roles,
422 );
423 }
424
425 #[instrument(level = "debug")]
426 fn apply_database_update(
427 &mut self,
428 database: mz_catalog::durable::Database,
429 diff: StateDiff,
430 retractions: &mut InProgressRetractions,
431 ) {
432 apply_inverted_lookup(
433 &mut self.database_by_name,
434 &database.name,
435 database.id,
436 diff,
437 );
438 apply_with_update(
439 &mut self.database_by_id,
440 database,
441 |database| database.id,
442 diff,
443 &mut retractions.databases,
444 );
445 }
446
447 #[instrument(level = "debug")]
448 fn apply_schema_update(
449 &mut self,
450 schema: mz_catalog::durable::Schema,
451 diff: StateDiff,
452 retractions: &mut InProgressRetractions,
453 ) {
454 match &schema.database_id {
455 Some(database_id) => {
456 let db = self
457 .database_by_id
458 .get_mut(database_id)
459 .expect("catalog out of sync");
460 apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
461 apply_with_update(
462 &mut db.schemas_by_id,
463 schema,
464 |schema| schema.id,
465 diff,
466 &mut retractions.schemas,
467 );
468 }
469 None => {
470 apply_inverted_lookup(
471 &mut self.ambient_schemas_by_name,
472 &schema.name,
473 schema.id,
474 diff,
475 );
476 apply_with_update(
477 &mut self.ambient_schemas_by_id,
478 schema,
479 |schema| schema.id,
480 diff,
481 &mut retractions.schemas,
482 );
483 }
484 }
485 }
486
487 #[instrument(level = "debug")]
488 fn apply_default_privilege_update(
489 &mut self,
490 default_privilege: mz_catalog::durable::DefaultPrivilege,
491 diff: StateDiff,
492 _retractions: &mut InProgressRetractions,
493 ) {
494 match diff {
495 StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
496 .grant(default_privilege.object, default_privilege.acl_item),
497 StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
498 .revoke(&default_privilege.object, &default_privilege.acl_item),
499 }
500 }
501
502 #[instrument(level = "debug")]
503 fn apply_system_privilege_update(
504 &mut self,
505 system_privilege: MzAclItem,
506 diff: StateDiff,
507 _retractions: &mut InProgressRetractions,
508 ) {
509 match diff {
510 StateDiff::Addition => {
511 Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
512 }
513 StateDiff::Retraction => {
514 Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
515 }
516 }
517 }
518
519 #[instrument(level = "debug")]
520 fn apply_system_configuration_update(
521 &mut self,
522 system_configuration: mz_catalog::durable::SystemConfiguration,
523 diff: StateDiff,
524 _retractions: &mut InProgressRetractions,
525 ) {
526 let res = match diff {
527 StateDiff::Addition => self.insert_system_configuration(
528 &system_configuration.name,
529 VarInput::Flat(&system_configuration.value),
530 ),
531 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
532 };
533 match res {
534 Ok(_) => (),
535 Err(Error {
539 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
540 }) => {
541 warn!(%name, "unknown system parameter from catalog storage");
542 }
543 Err(e) => panic!("unable to update system variable: {e:?}"),
544 }
545 }
546
547 fn apply_scoped_system_configuration_update<Id: Ord>(
561 map: &mut BTreeMap<Id, BTreeMap<String, String>>,
562 id: Id,
563 name: String,
564 value: String,
565 diff: StateDiff,
566 ) {
567 match diff {
568 StateDiff::Addition => {
569 map.entry(id).or_default().insert(name, value);
570 }
571 StateDiff::Retraction => {
572 if let Some(values) = map.get_mut(&id) {
573 if values.get(&name) == Some(&value) {
574 values.remove(&name);
575 if values.is_empty() {
576 map.remove(&id);
577 }
578 }
579 }
580 }
581 }
582 }
583
584 #[instrument(level = "debug")]
585 fn apply_cluster_update(
586 &mut self,
587 cluster: mz_catalog::durable::Cluster,
588 diff: StateDiff,
589 retractions: &mut InProgressRetractions,
590 ) {
591 if matches!(diff, StateDiff::Addition) {
598 if let mz_catalog::durable::ClusterVariant::Managed(managed) = &cluster.config.variant {
599 if !self.cluster_replica_sizes.0.contains_key(&managed.size) {
600 soft_panic_or_log!(
601 "managed cluster {} ({}) references unknown replica size {:?}; \
602 mz_clusters.disk will resolve to false",
603 cluster.name,
604 cluster.id,
605 managed.size,
606 );
607 }
608 }
609 }
610 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
611 apply_with_update(
612 &mut self.clusters_by_id,
613 cluster,
614 |cluster| cluster.id,
615 diff,
616 &mut retractions.clusters,
617 );
618 }
619
620 #[instrument(level = "debug")]
621 fn apply_network_policy_update(
622 &mut self,
623 policy: mz_catalog::durable::NetworkPolicy,
624 diff: StateDiff,
625 retractions: &mut InProgressRetractions,
626 ) {
627 apply_inverted_lookup(
628 &mut self.network_policies_by_name,
629 &policy.name,
630 policy.id,
631 diff,
632 );
633 apply_with_update(
634 &mut self.network_policies_by_id,
635 policy,
636 |policy| policy.id,
637 diff,
638 &mut retractions.network_policies,
639 );
640 }
641
642 #[instrument(level = "debug")]
643 fn apply_introspection_source_index_update(
644 &mut self,
645 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
646 diff: StateDiff,
647 retractions: &mut InProgressRetractions,
648 ) {
649 let cluster = self
650 .clusters_by_id
651 .get_mut(&introspection_source_index.cluster_id)
652 .expect("catalog out of sync");
653 let log = BUILTIN_LOG_LOOKUP
654 .get(introspection_source_index.name.as_str())
655 .expect("missing log");
656 apply_inverted_lookup(
657 &mut cluster.log_indexes,
658 &log.variant,
659 introspection_source_index.index_id,
660 diff,
661 );
662
663 match diff {
664 StateDiff::Addition => {
665 if let Some(mut entry) = retractions
666 .introspection_source_indexes
667 .remove(&introspection_source_index.item_id)
668 {
669 let (index_name, index) = self.create_introspection_source_index(
672 introspection_source_index.cluster_id,
673 log,
674 introspection_source_index.index_id,
675 );
676 assert_eq!(entry.id, introspection_source_index.item_id);
677 assert_eq!(entry.oid, introspection_source_index.oid);
678 assert_eq!(entry.name, index_name);
679 entry.item = index;
680 self.insert_entry(entry);
681 } else {
682 self.insert_introspection_source_index(
683 introspection_source_index.cluster_id,
684 log,
685 introspection_source_index.item_id,
686 introspection_source_index.index_id,
687 introspection_source_index.oid,
688 );
689 }
690 }
691 StateDiff::Retraction => {
692 let entry = self.drop_item(introspection_source_index.item_id);
693 retractions
694 .introspection_source_indexes
695 .insert(entry.id, entry);
696 }
697 }
698 }
699
700 #[instrument(level = "debug")]
701 fn apply_cluster_replica_update(
702 &mut self,
703 cluster_replica: mz_catalog::durable::ClusterReplica,
704 diff: StateDiff,
705 _retractions: &mut InProgressRetractions,
706 ) {
707 let cluster = self
708 .clusters_by_id
709 .get(&cluster_replica.cluster_id)
710 .expect("catalog out of sync");
711
712 if let mz_catalog::durable::ReplicaLocation::Managed { size, .. } =
721 &cluster_replica.config.location
722 {
723 if !self.cluster_replica_sizes.0.contains_key(size) {
724 soft_panic_or_log!(
725 "cluster replica {}.{} ({}) references unknown replica size {:?}; \
726 skipping in-memory registration",
727 cluster.name,
728 cluster_replica.name,
729 cluster_replica.replica_id,
730 size,
731 );
732 return;
733 }
734 }
735
736 let location = self
742 .concretize_replica_location(cluster_replica.config.location, &vec![], None, true)
743 .expect("catalog in unexpected state");
744 let cluster = self
745 .clusters_by_id
746 .get_mut(&cluster_replica.cluster_id)
747 .expect("catalog out of sync");
748 apply_inverted_lookup(
749 &mut cluster.replica_id_by_name_,
750 &cluster_replica.name,
751 cluster_replica.replica_id,
752 diff,
753 );
754 match diff {
755 StateDiff::Retraction => {
756 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
757 assert!(
758 prev.is_some(),
759 "retraction does not match existing value: {:?}",
760 cluster_replica.replica_id
761 );
762 }
763 StateDiff::Addition => {
764 let logging = ReplicaLogging {
765 log_logging: cluster_replica.config.logging.log_logging,
766 interval: cluster_replica.config.logging.interval,
767 };
768 let config = ReplicaConfig {
769 location,
770 compute: ComputeReplicaConfig { logging },
771 };
772 let mem_cluster_replica = ClusterReplica {
773 name: cluster_replica.name.clone(),
774 cluster_id: cluster_replica.cluster_id,
775 replica_id: cluster_replica.replica_id,
776 config,
777 owner_id: cluster_replica.owner_id,
778 };
779 let prev = cluster
780 .replicas_by_id_
781 .insert(cluster_replica.replica_id, mem_cluster_replica);
782 assert_eq!(
783 prev, None,
784 "values must be explicitly retracted before inserting a new value: {:?}",
785 cluster_replica.replica_id
786 );
787 }
788 }
789 }
790
791 #[instrument(level = "debug")]
792 fn apply_system_object_mapping_update(
793 &mut self,
794 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
795 diff: StateDiff,
796 retractions: &mut InProgressRetractions,
797 local_expression_cache: &mut LocalExpressionCache,
798 ) {
799 let item_id = system_object_mapping.unique_identifier.catalog_id;
800 let global_id = system_object_mapping.unique_identifier.global_id;
801
802 if system_object_mapping.unique_identifier.runtime_alterable() {
803 return;
807 }
808
809 if let StateDiff::Retraction = diff {
810 let entry = self.drop_item(item_id);
811 retractions.system_object_mappings.insert(item_id, entry);
812 return;
813 }
814
815 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
816 self.insert_entry(entry);
821 return;
822 }
823
824 let builtin = BUILTIN_LOOKUP
825 .get(&system_object_mapping.description)
826 .expect("missing builtin")
827 .1;
828 let schema_name = builtin.schema();
829 let schema_id = self
830 .ambient_schemas_by_name
831 .get(schema_name)
832 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
833 let name = QualifiedItemName {
834 qualifiers: ItemQualifiers {
835 database_spec: ResolvedDatabaseSpecifier::Ambient,
836 schema_spec: SchemaSpecifier::Id(*schema_id),
837 },
838 item: builtin.name().into(),
839 };
840 match builtin {
841 Builtin::Log(log) => {
842 let mut acl_items = vec![rbac::owner_privilege(
843 mz_sql::catalog::ObjectType::Source,
844 MZ_SYSTEM_ROLE_ID,
845 )];
846 acl_items.extend_from_slice(&log.access);
847 self.insert_item(
848 item_id,
849 log.oid,
850 name.clone(),
851 CatalogItem::Log(Log {
852 variant: log.variant,
853 global_id,
854 }),
855 MZ_SYSTEM_ROLE_ID,
856 PrivilegeMap::from_mz_acl_items(acl_items),
857 );
858 }
859
860 Builtin::Table(table) => {
861 let mut acl_items = vec![rbac::owner_privilege(
862 mz_sql::catalog::ObjectType::Table,
863 MZ_SYSTEM_ROLE_ID,
864 )];
865 acl_items.extend_from_slice(&table.access);
866
867 self.insert_item(
868 item_id,
869 table.oid,
870 name.clone(),
871 CatalogItem::Table(Table {
872 create_sql: None,
873 desc: VersionedRelationDesc::new(table.desc.clone()),
874 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
875 conn_id: None,
876 resolved_ids: ResolvedIds::empty(),
877 custom_logical_compaction_window: table.is_retained_metrics_object.then(
878 || {
879 self.system_config()
880 .metrics_retention()
881 .try_into()
882 .expect("invalid metrics retention")
883 },
884 ),
885 is_retained_metrics_object: table.is_retained_metrics_object,
886 data_source: TableDataSource::TableWrites {
887 defaults: vec![Expr::null(); table.desc.arity()],
888 },
889 }),
890 MZ_SYSTEM_ROLE_ID,
891 PrivilegeMap::from_mz_acl_items(acl_items),
892 );
893 }
894 Builtin::Index(index) => {
895 let custom_logical_compaction_window =
896 index.is_retained_metrics_object.then(|| {
897 self.system_config()
898 .metrics_retention()
899 .try_into()
900 .expect("invalid metrics retention")
901 });
902 let versions = BTreeMap::new();
904
905 let item = self
906 .parse_item(
907 global_id,
908 &index.create_sql(),
909 &versions,
910 None,
911 index.is_retained_metrics_object,
912 custom_logical_compaction_window,
913 local_expression_cache,
914 None,
915 )
916 .unwrap_or_else(|e| {
917 panic!(
918 "internal error: failed to load bootstrap index:\n\
919 {}\n\
920 error:\n\
921 {:?}\n\n\
922 make sure that the schema name is specified in the builtin index's create sql statement.",
923 index.name, e
924 )
925 });
926 let CatalogItem::Index(_) = item else {
927 panic!(
928 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
929 index.name
930 );
931 };
932
933 self.insert_item(
934 item_id,
935 index.oid,
936 name,
937 item,
938 MZ_SYSTEM_ROLE_ID,
939 PrivilegeMap::default(),
940 );
941 }
942 Builtin::View(_) => {
943 unreachable!("views added elsewhere");
945 }
946
947 Builtin::Type(typ) => {
949 let typ = self.resolve_builtin_type_references(typ);
950 if let CatalogType::Array { element_reference } = typ.details.typ {
951 let entry = self.get_entry_mut(&element_reference);
952 let item_type = match &mut entry.item {
953 CatalogItem::Type(item_type) => item_type,
954 _ => unreachable!("types can only reference other types"),
955 };
956 item_type.details.array_id = Some(item_id);
957 }
958
959 let schema_id = self.resolve_system_schema(typ.schema);
960
961 self.insert_item(
962 item_id,
963 typ.oid,
964 QualifiedItemName {
965 qualifiers: ItemQualifiers {
966 database_spec: ResolvedDatabaseSpecifier::Ambient,
967 schema_spec: SchemaSpecifier::Id(schema_id),
968 },
969 item: typ.name.to_owned(),
970 },
971 CatalogItem::Type(Type {
972 create_sql: None,
973 global_id,
974 details: typ.details.clone(),
975 resolved_ids: ResolvedIds::empty(),
976 }),
977 MZ_SYSTEM_ROLE_ID,
978 PrivilegeMap::from_mz_acl_items(vec![
979 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
980 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
981 ]),
982 );
983 }
984
985 Builtin::Func(func) => {
986 let oid = INVALID_OID;
990 self.insert_item(
991 item_id,
992 oid,
993 name.clone(),
994 CatalogItem::Func(Func {
995 inner: func.inner,
996 global_id,
997 }),
998 MZ_SYSTEM_ROLE_ID,
999 PrivilegeMap::default(),
1000 );
1001 }
1002
1003 Builtin::Source(coll) => {
1004 let mut acl_items = vec![rbac::owner_privilege(
1005 mz_sql::catalog::ObjectType::Source,
1006 MZ_SYSTEM_ROLE_ID,
1007 )];
1008 acl_items.extend_from_slice(&coll.access);
1009
1010 self.insert_item(
1011 item_id,
1012 coll.oid,
1013 name.clone(),
1014 CatalogItem::Source(Source {
1015 create_sql: None,
1016 data_source: coll.data_source.clone(),
1017 desc: coll.desc.clone(),
1018 global_id,
1019 timeline: Timeline::EpochMilliseconds,
1020 resolved_ids: ResolvedIds::empty(),
1021 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
1022 || {
1023 self.system_config()
1024 .metrics_retention()
1025 .try_into()
1026 .expect("invalid metrics retention")
1027 },
1028 ),
1029 is_retained_metrics_object: coll.is_retained_metrics_object,
1030 }),
1031 MZ_SYSTEM_ROLE_ID,
1032 PrivilegeMap::from_mz_acl_items(acl_items),
1033 );
1034 }
1035 Builtin::MaterializedView(mv) => {
1036 let mut acl_items = vec![rbac::owner_privilege(
1037 mz_sql::catalog::ObjectType::MaterializedView,
1038 MZ_SYSTEM_ROLE_ID,
1039 )];
1040 acl_items.extend_from_slice(&mv.access);
1041
1042 let custom_logical_compaction_window = mv.is_retained_metrics_object.then(|| {
1043 self.system_config()
1044 .metrics_retention()
1045 .try_into()
1046 .expect("invalid metrics retention")
1047 });
1048
1049 let versions = BTreeMap::new();
1051
1052 let mut item = self
1053 .parse_item(
1054 global_id,
1055 &mv.create_sql(),
1056 &versions,
1057 None,
1058 mv.is_retained_metrics_object,
1059 custom_logical_compaction_window,
1060 local_expression_cache,
1061 None,
1062 )
1063 .unwrap_or_else(|e| {
1064 panic!(
1065 "internal error: failed to load bootstrap materialized view:\n\
1066 {}\n\
1067 error:\n\
1068 {e:?}\n\n\
1069 make sure that the schema name is specified in the builtin \
1070 materialized view's create sql statement.",
1071 mv.name,
1072 )
1073 });
1074 let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
1075 panic!(
1076 "internal error: builtin materialized view {}'s SQL does not begin \
1077 with \"CREATE MATERIALIZED VIEW\".",
1078 mv.name,
1079 );
1080 };
1081
1082 let mut desc = catalog_mv.desc.latest();
1086 for key in &mv.desc.typ().keys {
1087 desc = desc.with_key(key.clone());
1088 }
1089 catalog_mv.desc = VersionedRelationDesc::new(desc);
1090
1091 self.insert_item(
1092 item_id,
1093 mv.oid,
1094 name,
1095 item,
1096 MZ_SYSTEM_ROLE_ID,
1097 PrivilegeMap::from_mz_acl_items(acl_items),
1098 );
1099 }
1100 Builtin::Connection(connection) => {
1101 let versions = BTreeMap::new();
1103 let mut item = self
1104 .parse_item(
1105 global_id,
1106 connection.sql,
1107 &versions,
1108 None,
1109 false,
1110 None,
1111 local_expression_cache,
1112 None,
1113 )
1114 .unwrap_or_else(|e| {
1115 panic!(
1116 "internal error: failed to load bootstrap connection:\n\
1117 {}\n\
1118 error:\n\
1119 {:?}\n\n\
1120 make sure that the schema name is specified in the builtin connection's create sql statement.",
1121 connection.name, e
1122 )
1123 });
1124 let CatalogItem::Connection(_) = &mut item else {
1125 panic!(
1126 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1127 connection.name
1128 );
1129 };
1130
1131 let mut acl_items = vec![rbac::owner_privilege(
1132 mz_sql::catalog::ObjectType::Connection,
1133 connection.owner_id.clone(),
1134 )];
1135 acl_items.extend_from_slice(connection.access);
1136
1137 self.insert_item(
1138 item_id,
1139 connection.oid,
1140 name.clone(),
1141 item,
1142 connection.owner_id.clone(),
1143 PrivilegeMap::from_mz_acl_items(acl_items),
1144 );
1145 }
1146 }
1147 }
1148
1149 #[instrument(level = "debug")]
1150 fn apply_temporary_item_update(
1151 &mut self,
1152 temporary_item: TemporaryItem,
1153 diff: StateDiff,
1154 retractions: &mut InProgressRetractions,
1155 local_expression_cache: &mut LocalExpressionCache,
1156 ) {
1157 match diff {
1158 StateDiff::Addition => {
1159 let TemporaryItem {
1160 id,
1161 oid,
1162 global_id,
1163 schema_id,
1164 name,
1165 conn_id,
1166 create_sql,
1167 owner_id,
1168 privileges,
1169 extra_versions,
1170 } = temporary_item;
1171 let temp_conn_id = conn_id
1174 .as_ref()
1175 .expect("temporary items must have a connection id");
1176 if !self.temporary_schemas.contains_key(temp_conn_id) {
1177 self.create_temporary_schema(temp_conn_id, owner_id)
1178 .expect("failed to create temporary schema");
1179 }
1180 let schema = self.find_temp_schema(&schema_id);
1181 let name = QualifiedItemName {
1182 qualifiers: ItemQualifiers {
1183 database_spec: schema.database().clone(),
1184 schema_spec: schema.id().clone(),
1185 },
1186 item: name.clone(),
1187 };
1188
1189 let entry = match retractions.temp_items.remove(&id) {
1190 Some(mut retraction) => {
1191 assert_eq!(retraction.id, id);
1192
1193 if retraction.create_sql() != create_sql {
1198 let mut catalog_item = self
1199 .deserialize_item(
1200 global_id,
1201 &create_sql,
1202 &extra_versions,
1203 local_expression_cache,
1204 Some(retraction.item),
1205 )
1206 .unwrap_or_else(|e| {
1207 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1208 });
1209 catalog_item.set_conn_id(conn_id);
1216 retraction.item = catalog_item;
1217 }
1218
1219 retraction.id = id;
1220 retraction.oid = oid;
1221 retraction.name = name;
1222 retraction.owner_id = owner_id;
1223 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1224 retraction
1225 }
1226 None => {
1227 let mut catalog_item = self
1228 .deserialize_item(
1229 global_id,
1230 &create_sql,
1231 &extra_versions,
1232 local_expression_cache,
1233 None,
1234 )
1235 .unwrap_or_else(|e| {
1236 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1237 });
1238
1239 catalog_item.set_conn_id(conn_id);
1245
1246 CatalogEntry {
1247 item: catalog_item,
1248 referenced_by: Vec::new(),
1249 used_by: Vec::new(),
1250 id,
1251 oid,
1252 name,
1253 owner_id,
1254 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1255 }
1256 }
1257 };
1258 self.insert_entry(entry);
1259 }
1260 StateDiff::Retraction => {
1261 let entry = self.drop_item(temporary_item.id);
1262 retractions.temp_items.insert(temporary_item.id, entry);
1263 }
1264 }
1265 }
1266
1267 #[instrument(level = "debug")]
1268 fn apply_item_update(
1269 &mut self,
1270 item: mz_catalog::durable::Item,
1271 diff: StateDiff,
1272 retractions: &mut InProgressRetractions,
1273 local_expression_cache: &mut LocalExpressionCache,
1274 ) -> Result<(), CatalogError> {
1275 match diff {
1276 StateDiff::Addition => {
1277 let key = item.key();
1278 let mz_catalog::durable::Item {
1279 id,
1280 oid,
1281 global_id,
1282 schema_id,
1283 name,
1284 create_sql,
1285 owner_id,
1286 privileges,
1287 extra_versions,
1288 } = item;
1289 let schema = self.find_non_temp_schema(&schema_id);
1290 let name = QualifiedItemName {
1291 qualifiers: ItemQualifiers {
1292 database_spec: schema.database().clone(),
1293 schema_spec: schema.id().clone(),
1294 },
1295 item: name.clone(),
1296 };
1297 let entry = match retractions.items.remove(&key) {
1298 Some(retraction) => {
1299 assert_eq!(retraction.id, item.id);
1300
1301 let item = self
1302 .deserialize_item(
1303 global_id,
1304 &create_sql,
1305 &extra_versions,
1306 local_expression_cache,
1307 Some(retraction.item),
1308 )
1309 .unwrap_or_else(|e| {
1310 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1311 });
1312
1313 CatalogEntry {
1314 item,
1315 id,
1316 oid,
1317 name,
1318 owner_id,
1319 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1320 referenced_by: retraction.referenced_by,
1321 used_by: retraction.used_by,
1322 }
1323 }
1324 None => {
1325 let catalog_item = self
1326 .deserialize_item(
1327 global_id,
1328 &create_sql,
1329 &extra_versions,
1330 local_expression_cache,
1331 None,
1332 )
1333 .unwrap_or_else(|e| {
1334 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1335 });
1336 CatalogEntry {
1337 item: catalog_item,
1338 referenced_by: Vec::new(),
1339 used_by: Vec::new(),
1340 id,
1341 oid,
1342 name,
1343 owner_id,
1344 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1345 }
1346 }
1347 };
1348
1349 self.insert_entry(entry);
1350 }
1351 StateDiff::Retraction => {
1352 let entry = self.drop_item(item.id);
1353 let key = item.into_key_value().0;
1354 retractions.items.insert(key, entry);
1355 }
1356 }
1357 Ok(())
1358 }
1359
1360 #[instrument(level = "debug")]
1361 fn apply_comment_update(
1362 &mut self,
1363 comment: mz_catalog::durable::Comment,
1364 diff: StateDiff,
1365 _retractions: &mut InProgressRetractions,
1366 ) {
1367 match diff {
1368 StateDiff::Addition => {
1369 let prev = Arc::make_mut(&mut self.comments).update_comment(
1370 comment.object_id,
1371 comment.sub_component,
1372 Some(comment.comment),
1373 );
1374 assert_eq!(
1375 prev, None,
1376 "values must be explicitly retracted before inserting a new value"
1377 );
1378 }
1379 StateDiff::Retraction => {
1380 let prev = Arc::make_mut(&mut self.comments).update_comment(
1381 comment.object_id,
1382 comment.sub_component,
1383 None,
1384 );
1385 assert_eq!(
1386 prev,
1387 Some(comment.comment),
1388 "retraction does not match existing value: ({:?}, {:?})",
1389 comment.object_id,
1390 comment.sub_component,
1391 );
1392 }
1393 }
1394 }
1395
1396 #[instrument(level = "debug")]
1397 fn apply_source_references_update(
1398 &mut self,
1399 source_references: mz_catalog::durable::SourceReferences,
1400 diff: StateDiff,
1401 _retractions: &mut InProgressRetractions,
1402 ) {
1403 match diff {
1404 StateDiff::Addition => {
1405 let prev = self
1406 .source_references
1407 .insert(source_references.source_id, source_references.into());
1408 assert!(
1409 prev.is_none(),
1410 "values must be explicitly retracted before inserting a new value: {prev:?}"
1411 );
1412 }
1413 StateDiff::Retraction => {
1414 let prev = self.source_references.remove(&source_references.source_id);
1415 assert!(
1416 prev.is_some(),
1417 "retraction for a non-existent existing value: {source_references:?}"
1418 );
1419 }
1420 }
1421 }
1422
1423 #[instrument(level = "debug")]
1424 fn apply_storage_collection_metadata_update(
1425 &mut self,
1426 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1427 diff: StateDiff,
1428 _retractions: &mut InProgressRetractions,
1429 ) {
1430 apply_inverted_lookup(
1431 &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1432 &storage_collection_metadata.id,
1433 storage_collection_metadata.shard,
1434 diff,
1435 );
1436 }
1437
1438 #[instrument(level = "debug")]
1439 fn apply_unfinalized_shard_update(
1440 &mut self,
1441 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1442 diff: StateDiff,
1443 _retractions: &mut InProgressRetractions,
1444 ) {
1445 match diff {
1446 StateDiff::Addition => {
1447 let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1448 .unfinalized_shards
1449 .insert(unfinalized_shard.shard);
1450 assert!(
1451 newly_inserted,
1452 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1453 );
1454 }
1455 StateDiff::Retraction => {
1456 let removed = Arc::make_mut(&mut self.storage_metadata)
1457 .unfinalized_shards
1458 .remove(&unfinalized_shard.shard);
1459 assert!(
1460 removed,
1461 "retraction does not match existing value: {unfinalized_shard:?}"
1462 );
1463 }
1464 }
1465 }
1466
1467 #[instrument]
1470 pub(crate) fn generate_builtin_table_updates(
1471 &self,
1472 updates: Vec<StateUpdate>,
1473 ) -> Vec<BuiltinTableUpdate> {
1474 let mut builtin_table_updates = Vec::new();
1475 for StateUpdate { kind, ts: _, diff } in updates {
1476 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1477 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1478 builtin_table_updates.extend(builtin_table_update);
1479 }
1480 builtin_table_updates
1481 }
1482
1483 #[instrument(level = "debug")]
1486 pub(crate) fn generate_builtin_table_update(
1487 &self,
1488 kind: StateUpdateKind,
1489 diff: StateDiff,
1490 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1491 let diff = diff.into();
1492 match kind {
1493 StateUpdateKind::Role(_) => Vec::new(),
1497 StateUpdateKind::RoleAuth(role_auth) => {
1498 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1499 }
1500 StateUpdateKind::DefaultPrivilege(_) => Vec::new(),
1504 StateUpdateKind::SystemPrivilege(_) => Vec::new(),
1505 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1506 StateUpdateKind::ClusterSystemConfiguration(_) => Vec::new(),
1512 StateUpdateKind::ReplicaSystemConfiguration(_) => Vec::new(),
1513 StateUpdateKind::Cluster(_) => Vec::new(),
1517 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1518 self.pack_item_update(introspection_source_index.item_id, diff)
1519 }
1520 StateUpdateKind::ClusterReplica(_) => Vec::new(),
1523 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1524 if !system_object_mapping.unique_identifier.runtime_alterable() {
1528 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1529 } else {
1530 vec![]
1531 }
1532 }
1533 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1534 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1535 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1536 comment.object_id,
1537 comment.sub_component,
1538 &comment.comment,
1539 diff,
1540 )],
1541 StateUpdateKind::SourceReferences(source_references) => {
1542 self.pack_source_references_update(&source_references, diff)
1543 }
1544 StateUpdateKind::AuditLog(audit_log) => {
1545 vec![
1546 self.pack_audit_log_update(&audit_log.event, diff)
1547 .expect("could not pack audit log update"),
1548 ]
1549 }
1550 StateUpdateKind::Database(_)
1551 | StateUpdateKind::Schema(_)
1552 | StateUpdateKind::NetworkPolicy(_)
1553 | StateUpdateKind::StorageCollectionMetadata(_)
1554 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1555 }
1556 }
1557
1558 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1559 self.entry_by_id
1560 .get_mut(id)
1561 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1562 }
1563
1564 pub(super) fn set_optimized_plan(
1569 &mut self,
1570 id: GlobalId,
1571 plan: DataflowDescription<mz_expr::OptimizedMirRelationExpr>,
1572 ) {
1573 let item_id = self.entry_by_global_id[&id];
1574 let entry = self.get_entry_mut(&item_id);
1575 match entry.item_mut() {
1576 CatalogItem::Index(idx) => idx.optimized_plan = Some(Arc::new(plan)),
1577 CatalogItem::MaterializedView(mv) => mv.optimized_plan = Some(Arc::new(plan)),
1578 other => panic!("set_optimized_plan called on {} ({:?})", id, other.typ()),
1579 }
1580 }
1581
1582 pub(super) fn set_physical_plan(
1587 &mut self,
1588 id: GlobalId,
1589 plan: DataflowDescription<mz_compute_types::plan::Plan>,
1590 ) {
1591 let item_id = self.entry_by_global_id[&id];
1592 let entry = self.get_entry_mut(&item_id);
1593 match entry.item_mut() {
1594 CatalogItem::Index(idx) => idx.physical_plan = Some(Arc::new(plan)),
1595 CatalogItem::MaterializedView(mv) => mv.physical_plan = Some(Arc::new(plan)),
1596 other => panic!("set_physical_plan called on {} ({:?})", id, other.typ()),
1597 }
1598 }
1599
1600 pub(super) fn set_dataflow_metainfo(
1605 &mut self,
1606 id: GlobalId,
1607 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
1608 ) {
1609 for notice in metainfo.optimizer_notices.iter() {
1611 for dep_id in notice.dependencies.iter() {
1612 self.notices_by_dep_id
1613 .entry(*dep_id)
1614 .or_default()
1615 .push(Arc::clone(notice));
1616 }
1617 if let Some(item_id) = notice.item_id {
1618 soft_assert_eq_or_log!(
1619 item_id,
1620 id,
1621 "notice.item_id should match the id for whom we are saving the notice"
1622 );
1623 }
1624 }
1625 let item_id = self.entry_by_global_id[&id];
1627 let entry = self.get_entry_mut(&item_id);
1628 match entry.item_mut() {
1629 CatalogItem::Index(idx) => idx.dataflow_metainfo = Some(metainfo),
1630 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo = Some(metainfo),
1631 other => panic!("set_dataflow_metainfo called on {} ({:?})", id, other.typ()),
1632 }
1633 }
1634
1635 #[mz_ore::instrument(level = "trace")]
1646 pub(super) fn drop_optimizer_notices(
1647 &mut self,
1648 dropped_entries: Vec<CatalogEntry>,
1649 ) -> BTreeSet<Arc<OptimizerNotice>> {
1650 let mut dropped_notices = BTreeSet::new();
1651 let mut drop_ids = BTreeSet::new();
1652
1653 for mut entry in dropped_entries {
1655 drop_ids.extend(entry.global_ids());
1656 let metainfo = match entry.item_mut() {
1657 CatalogItem::Index(idx) => idx.dataflow_metainfo.take(),
1658 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.take(),
1659 _ => None,
1660 };
1661 if let Some(mut metainfo) = metainfo {
1662 soft_assert_or_log!(
1663 metainfo.optimizer_notices.iter().all_unique(),
1664 "should have been pushed there by \
1665 `push_optimizer_notice_dedup`"
1666 );
1667 for n in metainfo.optimizer_notices.drain(..) {
1668 for dep_id in n.dependencies.iter() {
1671 if let Some(notices) = self.notices_by_dep_id.get_mut(dep_id) {
1672 notices.retain(|x| &n != x);
1673 if notices.is_empty() {
1674 self.notices_by_dep_id.remove(dep_id);
1675 }
1676 }
1677 }
1678 dropped_notices.insert(n);
1679 }
1680 }
1681 }
1682
1683 for id in &drop_ids {
1687 if let Some(notices) = self.notices_by_dep_id.remove(id) {
1688 for n in notices.into_iter() {
1689 if let Some(item_id) = n.item_id.as_ref() {
1694 if let Some(entry) = self.try_get_entry_by_global_id(item_id) {
1695 let catalog_item_id = entry.id();
1696 let entry = self.get_entry_mut(&catalog_item_id);
1697 let item = entry.item_mut();
1698 match item {
1699 CatalogItem::Index(idx) => {
1700 if let Some(ref mut m) = idx.dataflow_metainfo {
1701 m.optimizer_notices.retain(|x| &n != x);
1702 }
1703 }
1704 CatalogItem::MaterializedView(mv) => {
1705 if let Some(ref mut m) = mv.dataflow_metainfo {
1706 m.optimizer_notices.retain(|x| &n != x);
1707 }
1708 }
1709 _ => {}
1710 }
1711 }
1712 }
1713 dropped_notices.insert(n);
1714 }
1715 }
1716 }
1717
1718 let todo_dep_ids: BTreeSet<GlobalId> = dropped_notices
1721 .iter()
1722 .flat_map(|n| n.dependencies.iter())
1723 .filter(|dep_id| !drop_ids.contains(dep_id))
1724 .copied()
1725 .collect();
1726 for id in todo_dep_ids {
1727 if let Some(notices) = self.notices_by_dep_id.get_mut(&id) {
1728 notices.retain(|n| !dropped_notices.contains(n));
1729 if notices.is_empty() {
1730 self.notices_by_dep_id.remove(&id);
1731 }
1732 }
1733 }
1734
1735 dropped_notices
1736 }
1737
1738 fn get_schema_mut(
1739 &mut self,
1740 database_spec: &ResolvedDatabaseSpecifier,
1741 schema_spec: &SchemaSpecifier,
1742 conn_id: &ConnectionId,
1743 ) -> &mut Schema {
1744 match (database_spec, schema_spec) {
1746 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1747 .temporary_schemas
1748 .get_mut(conn_id)
1749 .expect("catalog out of sync"),
1750 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1751 .ambient_schemas_by_id
1752 .get_mut(id)
1753 .expect("catalog out of sync"),
1754 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1755 .database_by_id
1756 .get_mut(database_id)
1757 .expect("catalog out of sync")
1758 .schemas_by_id
1759 .get_mut(schema_id)
1760 .expect("catalog out of sync"),
1761 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1762 unreachable!("temporary schemas are in the ambient database")
1763 }
1764 }
1765 }
1766
1767 #[instrument(name = "catalog::parse_views")]
1777 async fn parse_builtin_views(
1778 state: &mut CatalogState,
1779 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1780 retractions: &mut InProgressRetractions,
1781 local_expression_cache: &mut LocalExpressionCache,
1782 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1783 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1784 let (updates, additions): (Vec<_>, Vec<_>) =
1785 builtin_views
1786 .into_iter()
1787 .partition_map(|(view, item_id, gid)| {
1788 match retractions.system_object_mappings.remove(&item_id) {
1789 Some(entry) => Either::Left(entry),
1790 None => Either::Right((view, item_id, gid)),
1791 }
1792 });
1793
1794 for entry in updates {
1795 let item_id = entry.id();
1800 state.insert_entry(entry);
1801 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1802 }
1803
1804 let mut handles = Vec::new();
1805 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1806 BTreeMap::new();
1807 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1808 let mut awaiting_all = Vec::new();
1811 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1813 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1814
1815 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1817 .into_iter()
1818 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1819 .collect();
1820 let item_ids: Vec<_> = views.keys().copied().collect();
1821
1822 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1823 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1824 if handles.is_empty() && ready.is_empty() {
1825 ready.extend(awaiting_all.drain(..));
1827 }
1828
1829 if !ready.is_empty() {
1831 let spawn_state = Arc::new(state.clone());
1832 while let Some(id) = ready.pop_front() {
1833 let (view, global_id) = views.get(&id).expect("must exist");
1834 let global_id = *global_id;
1835 let create_sql = view.create_sql();
1836 let versions = BTreeMap::new();
1838
1839 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1840 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1841 let task_state = Arc::clone(&spawn_state);
1842 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1843 let handle = mz_ore::task::spawn_blocking(
1844 || "parse view",
1845 move || {
1846 span.in_scope(|| {
1847 let res = task_state.parse_item_inner(
1848 global_id,
1849 &create_sql,
1850 &versions,
1851 None,
1852 false,
1853 None,
1854 cached_expr,
1855 None,
1856 );
1857 (id, global_id, res)
1858 })
1859 },
1860 );
1861 handles.push(handle);
1862 }
1863 }
1864
1865 let (selected, _idx, remaining) = future::select_all(handles).await;
1867 handles = remaining;
1868 let (id, global_id, res) = selected;
1869 let mut insert_cached_expr = |cached_expr| {
1870 if let Some(cached_expr) = cached_expr {
1871 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1872 }
1873 };
1874 match res {
1875 Ok((item, uncached_expr)) => {
1876 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1877 local_expression_cache.insert_uncached_expression(
1878 global_id,
1879 uncached_expr,
1880 optimizer_features,
1881 );
1882 }
1883 let (view, _gid) = views.remove(&id).expect("must exist");
1885 let schema_id = state
1886 .ambient_schemas_by_name
1887 .get(view.schema)
1888 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1889 let qname = QualifiedItemName {
1890 qualifiers: ItemQualifiers {
1891 database_spec: ResolvedDatabaseSpecifier::Ambient,
1892 schema_spec: SchemaSpecifier::Id(*schema_id),
1893 },
1894 item: view.name.into(),
1895 };
1896 let mut acl_items = vec![rbac::owner_privilege(
1897 mz_sql::catalog::ObjectType::View,
1898 MZ_SYSTEM_ROLE_ID,
1899 )];
1900 acl_items.extend_from_slice(&view.access);
1901
1902 state.insert_item(
1903 id,
1904 view.oid,
1905 qname,
1906 item,
1907 MZ_SYSTEM_ROLE_ID,
1908 PrivilegeMap::from_mz_acl_items(acl_items),
1909 );
1910
1911 let mut resolved_dependent_items = Vec::new();
1913 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1914 resolved_dependent_items.extend(dependent_items);
1915 }
1916 let entry = state.get_entry(&id);
1917 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1918 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1919 resolved_dependent_items.extend(dependent_items);
1920 }
1921 ready.extend(resolved_dependent_items);
1922
1923 completed_ids.insert(id);
1924 completed_names.insert(full_name);
1925 }
1926 Err((
1928 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1929 cached_expr,
1930 )) => {
1931 insert_cached_expr(cached_expr);
1932 if completed_ids.contains(&missing_dep) {
1933 ready.push_back(id);
1934 } else {
1935 awaiting_id_dependencies
1936 .entry(missing_dep)
1937 .or_default()
1938 .push(id);
1939 }
1940 }
1941 Err((
1943 AdapterError::PlanError(plan::PlanError::Catalog(
1944 SqlCatalogError::UnknownItem(missing_dep),
1945 )),
1946 cached_expr,
1947 )) => {
1948 insert_cached_expr(cached_expr);
1949 match CatalogItemId::from_str(&missing_dep) {
1950 Ok(missing_dep) => {
1951 if completed_ids.contains(&missing_dep) {
1952 ready.push_back(id);
1953 } else {
1954 awaiting_id_dependencies
1955 .entry(missing_dep)
1956 .or_default()
1957 .push(id);
1958 }
1959 }
1960 Err(_) => {
1961 if completed_names.contains(&missing_dep) {
1962 ready.push_back(id);
1963 } else {
1964 awaiting_name_dependencies
1965 .entry(missing_dep)
1966 .or_default()
1967 .push(id);
1968 }
1969 }
1970 }
1971 }
1972 Err((
1973 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1974 cached_expr,
1975 )) => {
1976 insert_cached_expr(cached_expr);
1977 awaiting_all.push(id);
1978 }
1979 Err((e, _)) => {
1980 let (bad_view, _gid) = views.get(&id).expect("must exist");
1981 panic!(
1982 "internal error: failed to load bootstrap view:\n\
1983 {name}\n\
1984 error:\n\
1985 {e:?}\n\n\
1986 Make sure that the schema name is specified in the builtin view's create sql statement.
1987 ",
1988 name = bad_view.name,
1989 )
1990 }
1991 }
1992 }
1993
1994 assert!(awaiting_id_dependencies.is_empty());
1995 assert!(
1996 awaiting_name_dependencies.is_empty(),
1997 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1998 );
1999 assert!(awaiting_all.is_empty());
2000 assert!(views.is_empty());
2001
2002 builtin_table_updates.extend(
2004 item_ids
2005 .into_iter()
2006 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
2007 );
2008
2009 builtin_table_updates
2010 }
2011
2012 fn insert_entry(&mut self, entry: CatalogEntry) {
2014 if !entry.id.is_system() {
2015 if let Some(cluster_id) = entry.item.cluster_id() {
2016 self.clusters_by_id
2017 .get_mut(&cluster_id)
2018 .expect("catalog out of sync")
2019 .bound_objects
2020 .insert(entry.id);
2021 };
2022 }
2023
2024 for u in entry.references().items() {
2025 match self.entry_by_id.get_mut(u) {
2026 Some(metadata) => metadata.referenced_by.push(entry.id()),
2027 None => panic!(
2028 "Catalog: missing dependent catalog item {} while installing {}",
2029 &u,
2030 self.resolve_full_name(entry.name(), entry.conn_id())
2031 ),
2032 }
2033 }
2034 for u in entry.uses() {
2035 if u == entry.id() {
2038 continue;
2039 }
2040 match self.entry_by_id.get_mut(&u) {
2041 Some(metadata) => metadata.used_by.push(entry.id()),
2042 None => panic!(
2043 "Catalog: missing dependent catalog item {} while installing {}",
2044 &u,
2045 self.resolve_full_name(entry.name(), entry.conn_id())
2046 ),
2047 }
2048 }
2049 for gid in entry.item.global_ids() {
2050 self.entry_by_global_id.insert(gid, entry.id());
2051 }
2052 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2053 if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
2056 && !self.temporary_schemas.contains_key(conn_id)
2057 {
2058 self.create_temporary_schema(conn_id, entry.owner_id)
2059 .expect("failed to create temporary schema");
2060 }
2061 let schema = self.get_schema_mut(
2062 &entry.name().qualifiers.database_spec,
2063 &entry.name().qualifiers.schema_spec,
2064 conn_id,
2065 );
2066
2067 let prev_id = match entry.item() {
2068 CatalogItem::Func(_) => schema
2069 .functions
2070 .insert(entry.name().item.clone(), entry.id()),
2071 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
2072 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
2073 };
2074
2075 assert!(
2076 prev_id.is_none(),
2077 "builtin name collision on {:?}",
2078 entry.name().item.clone()
2079 );
2080
2081 self.entry_by_id.insert(entry.id(), entry.clone());
2082 }
2083
2084 fn insert_item(
2086 &mut self,
2087 id: CatalogItemId,
2088 oid: u32,
2089 name: QualifiedItemName,
2090 item: CatalogItem,
2091 owner_id: RoleId,
2092 privileges: PrivilegeMap,
2093 ) {
2094 let entry = CatalogEntry {
2095 item,
2096 name,
2097 id,
2098 oid,
2099 used_by: Vec::new(),
2100 referenced_by: Vec::new(),
2101 owner_id,
2102 privileges,
2103 };
2104
2105 self.insert_entry(entry);
2106 }
2107
2108 #[mz_ore::instrument(level = "trace")]
2109 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
2110 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
2111 for u in metadata.references().items() {
2112 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
2113 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
2114 }
2115 }
2116 for u in metadata.uses() {
2117 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
2118 dep_metadata.used_by.retain(|u| *u != metadata.id())
2119 }
2120 }
2121 for gid in metadata.global_ids() {
2122 self.entry_by_global_id.remove(&gid);
2123 }
2124
2125 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2126 let schema = self.get_schema_mut(
2127 &metadata.name().qualifiers.database_spec,
2128 &metadata.name().qualifiers.schema_spec,
2129 conn_id,
2130 );
2131 if metadata.item_type() == CatalogItemType::Type {
2132 schema
2133 .types
2134 .remove(&metadata.name().item)
2135 .expect("catalog out of sync");
2136 } else {
2137 assert_ne!(metadata.item_type(), CatalogItemType::Func);
2140
2141 schema
2142 .items
2143 .remove(&metadata.name().item)
2144 .expect("catalog out of sync");
2145 };
2146
2147 if !id.is_system() {
2148 if let Some(cluster_id) = metadata.item().cluster_id() {
2149 assert!(
2150 self.clusters_by_id
2151 .get_mut(&cluster_id)
2152 .expect("catalog out of sync")
2153 .bound_objects
2154 .remove(&id),
2155 "catalog out of sync"
2156 );
2157 }
2158 }
2159
2160 metadata
2161 }
2162
2163 fn insert_introspection_source_index(
2164 &mut self,
2165 cluster_id: ClusterId,
2166 log: &'static BuiltinLog,
2167 item_id: CatalogItemId,
2168 global_id: GlobalId,
2169 oid: u32,
2170 ) {
2171 let (index_name, index) =
2172 self.create_introspection_source_index(cluster_id, log, global_id);
2173 self.insert_item(
2174 item_id,
2175 oid,
2176 index_name,
2177 index,
2178 MZ_SYSTEM_ROLE_ID,
2179 PrivilegeMap::default(),
2180 );
2181 }
2182
2183 fn create_introspection_source_index(
2184 &self,
2185 cluster_id: ClusterId,
2186 log: &'static BuiltinLog,
2187 global_id: GlobalId,
2188 ) -> (QualifiedItemName, CatalogItem) {
2189 let source_name = FullItemName {
2190 database: RawDatabaseSpecifier::Ambient,
2191 schema: log.schema.into(),
2192 item: log.name.into(),
2193 };
2194 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
2195 let mut index_name = QualifiedItemName {
2196 qualifiers: ItemQualifiers {
2197 database_spec: ResolvedDatabaseSpecifier::Ambient,
2198 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
2199 },
2200 item: index_name.clone(),
2201 };
2202 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
2203 let index_item_name = index_name.item.clone();
2204 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
2205 let index = CatalogItem::Index(Index {
2206 global_id,
2207 on: log_global_id,
2208 keys: log
2209 .variant
2210 .index_by()
2211 .into_iter()
2212 .map(MirScalarExpr::column)
2213 .collect(),
2214 create_sql: index_sql(
2215 index_item_name,
2216 cluster_id,
2217 source_name,
2218 &log.variant.desc(),
2219 &log.variant.index_by(),
2220 ),
2221 conn_id: None,
2222 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
2223 cluster_id,
2224 is_retained_metrics_object: false,
2225 custom_logical_compaction_window: None,
2226 optimized_plan: None,
2227 physical_plan: None,
2228 dataflow_metainfo: None,
2229 });
2230 (index_name, index)
2231 }
2232
2233 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
2238 Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
2239 }
2240
2241 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
2246 Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
2247 }
2248}
2249
2250fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
2258 fn push_update<T>(
2259 update: T,
2260 diff: StateDiff,
2261 retractions: &mut Vec<T>,
2262 additions: &mut Vec<T>,
2263 ) {
2264 match diff {
2265 StateDiff::Retraction => retractions.push(update),
2266 StateDiff::Addition => additions.push(update),
2267 }
2268 }
2269
2270 soft_assert_no_log!(
2271 updates.iter().map(|update| update.ts).all_equal(),
2272 "all timestamps should be equal: {updates:?}"
2273 );
2274 soft_assert_no_log!(
2275 {
2276 let mut dedup = BTreeSet::new();
2277 updates.iter().all(|update| dedup.insert(&update.kind))
2278 },
2279 "updates should be consolidated: {updates:?}"
2280 );
2281
2282 let mut pre_cluster_retractions = Vec::new();
2284 let mut pre_cluster_additions = Vec::new();
2285 let mut cluster_retractions = Vec::new();
2286 let mut cluster_additions = Vec::new();
2287 let mut builtin_item_updates = Vec::new();
2288 let mut item_retractions = Vec::new();
2289 let mut item_additions = Vec::new();
2290 let mut temp_item_retractions = Vec::new();
2291 let mut temp_item_additions = Vec::new();
2292 let mut post_item_retractions = Vec::new();
2293 let mut post_item_additions = Vec::new();
2294 for update in updates {
2295 let diff = update.diff.clone();
2296 match update.kind {
2297 StateUpdateKind::Role(_)
2298 | StateUpdateKind::RoleAuth(_)
2299 | StateUpdateKind::Database(_)
2300 | StateUpdateKind::Schema(_)
2301 | StateUpdateKind::DefaultPrivilege(_)
2302 | StateUpdateKind::SystemPrivilege(_)
2303 | StateUpdateKind::SystemConfiguration(_)
2304 | StateUpdateKind::NetworkPolicy(_) => push_update(
2305 update,
2306 diff,
2307 &mut pre_cluster_retractions,
2308 &mut pre_cluster_additions,
2309 ),
2310 StateUpdateKind::Cluster(_)
2311 | StateUpdateKind::ClusterSystemConfiguration(_)
2312 | StateUpdateKind::IntrospectionSourceIndex(_)
2313 | StateUpdateKind::ClusterReplica(_)
2314 | StateUpdateKind::ReplicaSystemConfiguration(_) => push_update(
2315 update,
2316 diff,
2317 &mut cluster_retractions,
2318 &mut cluster_additions,
2319 ),
2320 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2321 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2322 }
2323 StateUpdateKind::TemporaryItem(item) => push_update(
2324 (item, update.ts, update.diff),
2325 diff,
2326 &mut temp_item_retractions,
2327 &mut temp_item_additions,
2328 ),
2329 StateUpdateKind::Item(item) => push_update(
2330 (item, update.ts, update.diff),
2331 diff,
2332 &mut item_retractions,
2333 &mut item_additions,
2334 ),
2335 StateUpdateKind::Comment(_)
2336 | StateUpdateKind::SourceReferences(_)
2337 | StateUpdateKind::AuditLog(_)
2338 | StateUpdateKind::StorageCollectionMetadata(_)
2339 | StateUpdateKind::UnfinalizedShard(_) => push_update(
2340 update,
2341 diff,
2342 &mut post_item_retractions,
2343 &mut post_item_additions,
2344 ),
2345 }
2346 }
2347
2348 let builtin_item_updates = builtin_item_updates
2351 .into_iter()
2352 .map(|(system_object_mapping, ts, diff)| {
2353 let idx = BUILTIN_LOOKUP
2354 .get(&system_object_mapping.description)
2355 .expect("missing builtin")
2356 .0;
2357 (idx, system_object_mapping, ts, diff)
2358 })
2359 .sorted_by_key(|(idx, _, _, _)| *idx)
2360 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2361
2362 let mut builtin_source_retractions = Vec::new();
2366 let mut builtin_source_additions = Vec::new();
2367 let mut other_builtin_retractions = Vec::new();
2368 let mut other_builtin_additions = Vec::new();
2369 for (builtin_item_update, ts, diff) in builtin_item_updates {
2370 let object_type = builtin_item_update.description.object_type;
2371 let update = StateUpdate {
2372 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2373 ts,
2374 diff,
2375 };
2376 if object_type == CatalogItemType::Source {
2377 push_update(
2378 update,
2379 diff,
2380 &mut builtin_source_retractions,
2381 &mut builtin_source_additions,
2382 );
2383 } else {
2384 push_update(
2385 update,
2386 diff,
2387 &mut other_builtin_retractions,
2388 &mut other_builtin_additions,
2389 );
2390 }
2391 }
2392
2393 fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2399 tracing::debug!(?items, "sorting items by dependencies");
2400
2401 let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2402 let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2403 let statement = mz_sql::parse::parse(&item.0.create_sql)
2404 .expect("valid create_sql")
2405 .into_element()
2406 .ast;
2407 mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2408 };
2409 sort_topological(items, key_fn, dependencies_fn);
2410 }
2411
2412 fn sort_item_updates(
2428 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2429 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2430 let mut types = Vec::new();
2433 let mut funcs = Vec::new();
2436 let mut secrets = Vec::new();
2437 let mut connections = Vec::new();
2438 let mut sources = Vec::new();
2439 let mut tables = Vec::new();
2440 let mut derived_items = Vec::new();
2441 let mut sinks = Vec::new();
2442 for update in item_updates {
2443 match update.0.item_type() {
2444 CatalogItemType::Type => types.push(update),
2445 CatalogItemType::Func => funcs.push(update),
2446 CatalogItemType::Secret => secrets.push(update),
2447 CatalogItemType::Connection => connections.push(update),
2448 CatalogItemType::Source => sources.push(update),
2449 CatalogItemType::Table => tables.push(update),
2450 CatalogItemType::View
2451 | CatalogItemType::MaterializedView
2452 | CatalogItemType::Index => derived_items.push(update),
2453 CatalogItemType::Sink => sinks.push(update),
2454 }
2455 }
2456
2457 sort_items_topological(&mut connections);
2461 sort_items_topological(&mut derived_items);
2462
2463 for group in [
2465 &mut types,
2466 &mut funcs,
2467 &mut secrets,
2468 &mut sources,
2469 &mut tables,
2470 &mut sinks,
2471 ] {
2472 group.sort_by_key(|(item, _, _)| item.id);
2473 }
2474
2475 iter::empty()
2476 .chain(types)
2477 .chain(funcs)
2478 .chain(secrets)
2479 .chain(connections)
2480 .chain(sources)
2481 .chain(tables)
2482 .chain(derived_items)
2483 .chain(sinks)
2484 .collect()
2485 }
2486
2487 let item_retractions = sort_item_updates(item_retractions);
2488 let item_additions = sort_item_updates(item_additions);
2489
2490 fn sort_temp_item_updates(
2494 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2495 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2496 let mut types = Vec::new();
2499 let mut funcs = Vec::new();
2501 let mut secrets = Vec::new();
2502 let mut connections = Vec::new();
2503 let mut sources = Vec::new();
2504 let mut tables = Vec::new();
2505 let mut derived_items = Vec::new();
2506 let mut sinks = Vec::new();
2507 for update in temp_item_updates {
2508 match update.0.item_type() {
2509 CatalogItemType::Type => types.push(update),
2510 CatalogItemType::Func => funcs.push(update),
2511 CatalogItemType::Secret => secrets.push(update),
2512 CatalogItemType::Connection => connections.push(update),
2513 CatalogItemType::Source => sources.push(update),
2514 CatalogItemType::Table => tables.push(update),
2515 CatalogItemType::View
2516 | CatalogItemType::MaterializedView
2517 | CatalogItemType::Index => derived_items.push(update),
2518 CatalogItemType::Sink => sinks.push(update),
2519 }
2520 }
2521
2522 for group in [
2524 &mut types,
2525 &mut funcs,
2526 &mut secrets,
2527 &mut connections,
2528 &mut sources,
2529 &mut tables,
2530 &mut derived_items,
2531 &mut sinks,
2532 ] {
2533 group.sort_by_key(|(item, _, _)| item.id);
2534 }
2535
2536 iter::empty()
2537 .chain(types)
2538 .chain(funcs)
2539 .chain(secrets)
2540 .chain(connections)
2541 .chain(sources)
2542 .chain(tables)
2543 .chain(derived_items)
2544 .chain(sinks)
2545 .collect()
2546 }
2547 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2548 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2549
2550 fn merge_item_updates(
2552 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2553 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2554 ) -> Vec<StateUpdate> {
2555 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2556
2557 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2558 (item_updates.front(), temp_item_updates.front())
2559 {
2560 if item.id < temp_item.id {
2561 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2562 state_updates.push(StateUpdate {
2563 kind: StateUpdateKind::Item(item),
2564 ts,
2565 diff,
2566 });
2567 } else if item.id > temp_item.id {
2568 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2569 state_updates.push(StateUpdate {
2570 kind: StateUpdateKind::TemporaryItem(temp_item),
2571 ts,
2572 diff,
2573 });
2574 } else {
2575 unreachable!(
2576 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2577 );
2578 }
2579 }
2580
2581 while let Some((item, ts, diff)) = item_updates.pop_front() {
2582 state_updates.push(StateUpdate {
2583 kind: StateUpdateKind::Item(item),
2584 ts,
2585 diff,
2586 });
2587 }
2588
2589 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2590 state_updates.push(StateUpdate {
2591 kind: StateUpdateKind::TemporaryItem(temp_item),
2592 ts,
2593 diff,
2594 });
2595 }
2596
2597 state_updates
2598 }
2599 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2600 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2601
2602 iter::empty()
2604 .chain(post_item_retractions.into_iter().rev())
2606 .chain(item_retractions.into_iter().rev())
2607 .chain(other_builtin_retractions.into_iter().rev())
2608 .chain(cluster_retractions.into_iter().rev())
2609 .chain(builtin_source_retractions.into_iter().rev())
2610 .chain(pre_cluster_retractions.into_iter().rev())
2611 .chain(pre_cluster_additions)
2612 .chain(builtin_source_additions)
2613 .chain(cluster_additions)
2614 .chain(other_builtin_additions)
2615 .chain(item_additions)
2616 .chain(post_item_additions)
2617 .collect()
2618}
2619
2620enum ApplyState {
2625 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2627 Items(Vec<StateUpdate>),
2633 Updates(Vec<StateUpdate>),
2635}
2636
2637impl ApplyState {
2638 fn new(update: StateUpdate) -> Self {
2639 use StateUpdateKind::*;
2640 match &update.kind {
2641 SystemObjectMapping(som)
2642 if som.description.object_type == CatalogItemType::View
2643 && update.diff == StateDiff::Addition =>
2644 {
2645 let view_addition = lookup_builtin_view_addition(som.clone());
2646 Self::BuiltinViewAdditions(vec![view_addition])
2647 }
2648
2649 IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2650 Self::Items(vec![update])
2651 }
2652
2653 Role(_)
2654 | RoleAuth(_)
2655 | Database(_)
2656 | Schema(_)
2657 | DefaultPrivilege(_)
2658 | SystemPrivilege(_)
2659 | SystemConfiguration(_)
2660 | ClusterSystemConfiguration(_)
2661 | ReplicaSystemConfiguration(_)
2662 | Cluster(_)
2663 | NetworkPolicy(_)
2664 | ClusterReplica(_)
2665 | SourceReferences(_)
2666 | Comment(_)
2667 | AuditLog(_)
2668 | StorageCollectionMetadata(_)
2669 | UnfinalizedShard(_) => Self::Updates(vec![update]),
2670 }
2671 }
2672
2673 async fn apply(
2679 self,
2680 state: &mut CatalogState,
2681 retractions: &mut InProgressRetractions,
2682 local_expression_cache: &mut LocalExpressionCache,
2683 ) -> (
2684 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2685 Vec<ParsedStateUpdate>,
2686 ) {
2687 match self {
2688 Self::BuiltinViewAdditions(builtin_view_additions) => {
2689 let restore = Arc::clone(&state.system_configuration);
2690 Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2691 let builtin_table_updates = CatalogState::parse_builtin_views(
2692 state,
2693 builtin_view_additions,
2694 retractions,
2695 local_expression_cache,
2696 )
2697 .await;
2698 state.system_configuration = restore;
2699 (builtin_table_updates, Vec::new())
2700 }
2701 Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2702 state
2703 .apply_updates_inner(updates, retractions, local_expression_cache)
2704 .expect("corrupt catalog")
2705 }),
2706 Self::Updates(updates) => state
2707 .apply_updates_inner(updates, retractions, local_expression_cache)
2708 .expect("corrupt catalog"),
2709 }
2710 }
2711
2712 async fn step(
2713 self,
2714 next: Self,
2715 state: &mut CatalogState,
2716 retractions: &mut InProgressRetractions,
2717 local_expression_cache: &mut LocalExpressionCache,
2718 ) -> (
2719 Self,
2720 (
2721 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2722 Vec<ParsedStateUpdate>,
2723 ),
2724 ) {
2725 match (self, next) {
2726 (
2727 Self::BuiltinViewAdditions(mut builtin_view_additions),
2728 Self::BuiltinViewAdditions(next_builtin_view_additions),
2729 ) => {
2730 builtin_view_additions.extend(next_builtin_view_additions);
2732 (
2733 Self::BuiltinViewAdditions(builtin_view_additions),
2734 (Vec::new(), Vec::new()),
2735 )
2736 }
2737 (Self::Items(mut updates), Self::Items(next_updates)) => {
2738 updates.extend(next_updates);
2740 (Self::Items(updates), (Vec::new(), Vec::new()))
2741 }
2742 (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2743 updates.extend(next_updates);
2745 (Self::Updates(updates), (Vec::new(), Vec::new()))
2746 }
2747 (apply_state, next_apply_state) => {
2748 let updates = apply_state
2750 .apply(state, retractions, local_expression_cache)
2751 .await;
2752 (next_apply_state, updates)
2753 }
2754 }
2755 }
2756}
2757
2758trait MutableMap<K, V> {
2761 fn insert(&mut self, key: K, value: V) -> Option<V>;
2762 fn remove(&mut self, key: &K) -> Option<V>;
2763}
2764
2765impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2766 fn insert(&mut self, key: K, value: V) -> Option<V> {
2767 BTreeMap::insert(self, key, value)
2768 }
2769 fn remove(&mut self, key: &K) -> Option<V> {
2770 BTreeMap::remove(self, key)
2771 }
2772}
2773
2774impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2775 fn insert(&mut self, key: K, value: V) -> Option<V> {
2776 imbl::OrdMap::insert(self, key, value)
2777 }
2778 fn remove(&mut self, key: &K) -> Option<V> {
2779 imbl::OrdMap::remove(self, key)
2780 }
2781}
2782
2783fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2788where
2789 K: Ord + Clone + Debug,
2790 V: PartialEq + Debug,
2791{
2792 match diff {
2793 StateDiff::Retraction => {
2794 let prev = map.remove(key);
2795 assert_eq!(
2796 prev,
2797 Some(value),
2798 "retraction does not match existing value: {key:?}"
2799 );
2800 }
2801 StateDiff::Addition => {
2802 let prev = map.insert(key.clone(), value);
2803 assert_eq!(
2804 prev, None,
2805 "values must be explicitly retracted before inserting a new value: {key:?}"
2806 );
2807 }
2808 }
2809}
2810
2811fn apply_with_update<K, V, D>(
2814 map: &mut impl MutableMap<K, V>,
2815 durable: D,
2816 key_fn: impl FnOnce(&D) -> K,
2817 diff: StateDiff,
2818 retractions: &mut BTreeMap<D::Key, V>,
2819) where
2820 K: Ord,
2821 V: UpdateFrom<D> + PartialEq + Debug,
2822 D: DurableType,
2823 D::Key: Ord,
2824{
2825 match diff {
2826 StateDiff::Retraction => {
2827 let mem_key = key_fn(&durable);
2828 let value = map
2829 .remove(&mem_key)
2830 .expect("retraction does not match existing value: {key:?}");
2831 let durable_key = durable.into_key_value().0;
2832 retractions.insert(durable_key, value);
2833 }
2834 StateDiff::Addition => {
2835 let mem_key = key_fn(&durable);
2836 let durable_key = durable.key();
2837 let value = match retractions.remove(&durable_key) {
2838 Some(mut retraction) => {
2839 retraction.update_from(durable);
2840 retraction
2841 }
2842 None => durable.into(),
2843 };
2844 let prev = map.insert(mem_key, value);
2845 assert_eq!(
2846 prev, None,
2847 "values must be explicitly retracted before inserting a new value"
2848 );
2849 }
2850 }
2851}
2852
2853fn lookup_builtin_view_addition(
2855 mapping: SystemObjectMapping,
2856) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2857 let (_, builtin) = BUILTIN_LOOKUP
2858 .get(&mapping.description)
2859 .expect("missing builtin view");
2860 let Builtin::View(view) = builtin else {
2861 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2862 };
2863
2864 (
2865 view,
2866 mapping.unique_identifier.catalog_id,
2867 mapping.unique_identifier.global_id,
2868 )
2869}