1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29 SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35 Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36 TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_compute_types::dataflows::DataflowDescription;
40use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_ore::collections::CollectionExt;
44use mz_ore::tracing::OpenTelemetryContext;
45use mz_ore::{instrument, soft_assert_eq_or_log, soft_assert_no_log, soft_assert_or_log};
46use mz_pgrepr::oid::INVALID_OID;
47use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
48use mz_repr::role_id::RoleId;
49use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
50use mz_sql::catalog::CatalogError as SqlCatalogError;
51use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
52use mz_sql::names::{
53 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
54 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
55};
56use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
57use mz_sql::session::vars::{VarError, VarInput};
58use mz_sql::{plan, rbac};
59use mz_sql_parser::ast::Expr;
60use mz_storage_types::sources::Timeline;
61use mz_transform::dataflow::DataflowMetainfo;
62use mz_transform::notice::OptimizerNotice;
63use tracing::{info_span, warn};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{BuiltinTableUpdate, CatalogState};
68use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
69use crate::util::{index_sql, sort_topological};
70
71#[derive(Debug, Clone, Default)]
83struct InProgressRetractions {
84 roles: BTreeMap<RoleKey, Role>,
85 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
86 databases: BTreeMap<DatabaseKey, Database>,
87 schemas: BTreeMap<SchemaKey, Schema>,
88 clusters: BTreeMap<ClusterKey, Cluster>,
89 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
90 items: BTreeMap<ItemKey, CatalogEntry>,
91 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
92 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
93 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
94}
95
96impl CatalogState {
97 #[must_use]
101 #[instrument]
102 pub(crate) async fn apply_updates(
103 &mut self,
104 updates: Vec<StateUpdate>,
105 local_expression_cache: &mut LocalExpressionCache,
106 ) -> (
107 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
108 Vec<ParsedStateUpdate>,
109 ) {
110 let mut builtin_table_updates = Vec::with_capacity(updates.len());
111 let mut catalog_updates = Vec::with_capacity(updates.len());
112
113 let updates = Self::consolidate_updates(updates);
118
119 let mut groups: Vec<Vec<_>> = Vec::new();
121 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
122 let updates = sort_updates(updates.collect());
126 groups.push(updates);
127 }
128
129 for updates in groups {
130 let mut apply_state = ApplyState::Updates(Vec::new());
131 let mut retractions = InProgressRetractions::default();
132
133 for update in updates {
134 let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
135 .step(
136 ApplyState::new(update),
137 self,
138 &mut retractions,
139 local_expression_cache,
140 )
141 .await;
142 apply_state = next_apply_state;
143 builtin_table_updates.extend(builtin_table_update);
144 catalog_updates.extend(catalog_update);
145 }
146
147 let (builtin_table_update, catalog_update) = apply_state
149 .apply(self, &mut retractions, local_expression_cache)
150 .await;
151 builtin_table_updates.extend(builtin_table_update);
152 catalog_updates.extend(catalog_update);
153
154 let dropped_entries: Vec<CatalogEntry> = retractions
157 .items
158 .into_values()
159 .chain(retractions.temp_items.into_values())
160 .collect();
161 if !dropped_entries.is_empty() {
162 let dropped_notices = self.drop_optimizer_notices(dropped_entries);
163 if self.system_config().enable_mz_notices() {
164 self.pack_optimizer_notice_updates(
165 &mut builtin_table_updates,
166 dropped_notices.iter(),
167 Diff::MINUS_ONE,
168 );
169 }
170 }
171 }
172
173 (builtin_table_updates, catalog_updates)
174 }
175
176 fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
184 let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
185 .into_iter()
186 .map(|update| (update.kind, update.ts, update.diff.into()))
187 .collect_vec();
188
189 consolidate_updates(&mut updates);
190
191 updates
192 .into_iter()
193 .map(|(kind, ts, diff)| StateUpdate {
194 kind,
195 ts,
196 diff: diff
197 .try_into()
198 .expect("catalog state cannot have diff other than -1 or 1"),
199 })
200 .collect_vec()
201 }
202
203 #[instrument(level = "debug")]
204 fn apply_updates_inner(
205 &mut self,
206 updates: Vec<StateUpdate>,
207 retractions: &mut InProgressRetractions,
208 local_expression_cache: &mut LocalExpressionCache,
209 ) -> Result<
210 (
211 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
212 Vec<ParsedStateUpdate>,
213 ),
214 CatalogError,
215 > {
216 soft_assert_no_log!(
217 updates.iter().map(|update| update.ts).all_equal(),
218 "all timestamps should be equal: {updates:?}"
219 );
220
221 let mut update_system_config = false;
222
223 let mut builtin_table_updates = Vec::with_capacity(updates.len());
224 let mut catalog_updates = Vec::new();
225
226 for state_update in updates {
227 if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
228 update_system_config = true;
229 }
230
231 match state_update.diff {
232 StateDiff::Retraction => {
233 if let Some(update) =
237 parsed_state_updates::parse_state_update(self, state_update.clone())
238 {
239 catalog_updates.push(update);
240 }
241
242 builtin_table_updates.extend(self.generate_builtin_table_update(
245 state_update.kind.clone(),
246 state_update.diff,
247 ));
248 self.apply_update(
249 state_update.kind,
250 state_update.diff,
251 retractions,
252 local_expression_cache,
253 )?;
254 }
255 StateDiff::Addition => {
256 self.apply_update(
257 state_update.kind.clone(),
258 state_update.diff,
259 retractions,
260 local_expression_cache,
261 )?;
262 builtin_table_updates.extend(self.generate_builtin_table_update(
266 state_update.kind.clone(),
267 state_update.diff,
268 ));
269
270 if let Some(update) =
273 parsed_state_updates::parse_state_update(self, state_update.clone())
274 {
275 catalog_updates.push(update);
276 }
277 }
278 }
279 }
280
281 if update_system_config {
282 self.system_configuration.dyncfg_updates();
283 }
284
285 Ok((builtin_table_updates, catalog_updates))
286 }
287
288 #[instrument(level = "debug")]
289 fn apply_update(
290 &mut self,
291 kind: StateUpdateKind,
292 diff: StateDiff,
293 retractions: &mut InProgressRetractions,
294 local_expression_cache: &mut LocalExpressionCache,
295 ) -> Result<(), CatalogError> {
296 match kind {
297 StateUpdateKind::Role(role) => {
298 self.apply_role_update(role, diff, retractions);
299 }
300 StateUpdateKind::RoleAuth(role_auth) => {
301 self.apply_role_auth_update(role_auth, diff, retractions);
302 }
303 StateUpdateKind::Database(database) => {
304 self.apply_database_update(database, diff, retractions);
305 }
306 StateUpdateKind::Schema(schema) => {
307 self.apply_schema_update(schema, diff, retractions);
308 }
309 StateUpdateKind::DefaultPrivilege(default_privilege) => {
310 self.apply_default_privilege_update(default_privilege, diff, retractions);
311 }
312 StateUpdateKind::SystemPrivilege(system_privilege) => {
313 self.apply_system_privilege_update(system_privilege, diff, retractions);
314 }
315 StateUpdateKind::SystemConfiguration(system_configuration) => {
316 self.apply_system_configuration_update(system_configuration, diff, retractions);
317 }
318 StateUpdateKind::Cluster(cluster) => {
319 self.apply_cluster_update(cluster, diff, retractions);
320 }
321 StateUpdateKind::NetworkPolicy(network_policy) => {
322 self.apply_network_policy_update(network_policy, diff, retractions);
323 }
324 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
325 self.apply_introspection_source_index_update(
326 introspection_source_index,
327 diff,
328 retractions,
329 );
330 }
331 StateUpdateKind::ClusterReplica(cluster_replica) => {
332 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
333 }
334 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
335 self.apply_system_object_mapping_update(
336 system_object_mapping,
337 diff,
338 retractions,
339 local_expression_cache,
340 );
341 }
342 StateUpdateKind::TemporaryItem(item) => {
343 self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
344 }
345 StateUpdateKind::Item(item) => {
346 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
347 }
348 StateUpdateKind::Comment(comment) => {
349 self.apply_comment_update(comment, diff, retractions);
350 }
351 StateUpdateKind::SourceReferences(source_reference) => {
352 self.apply_source_references_update(source_reference, diff, retractions);
353 }
354 StateUpdateKind::AuditLog(_audit_log) => {
355 }
357 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
358 self.apply_storage_collection_metadata_update(
359 storage_collection_metadata,
360 diff,
361 retractions,
362 );
363 }
364 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
365 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
366 }
367 }
368
369 Ok(())
370 }
371
372 #[instrument(level = "debug")]
373 fn apply_role_auth_update(
374 &mut self,
375 role_auth: mz_catalog::durable::RoleAuth,
376 diff: StateDiff,
377 retractions: &mut InProgressRetractions,
378 ) {
379 apply_with_update(
380 &mut self.role_auth_by_id,
381 role_auth,
382 |role_auth| role_auth.role_id,
383 diff,
384 &mut retractions.role_auths,
385 );
386 }
387
388 #[instrument(level = "debug")]
389 fn apply_role_update(
390 &mut self,
391 role: mz_catalog::durable::Role,
392 diff: StateDiff,
393 retractions: &mut InProgressRetractions,
394 ) {
395 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
396 apply_with_update(
397 &mut self.roles_by_id,
398 role,
399 |role| role.id,
400 diff,
401 &mut retractions.roles,
402 );
403 }
404
405 #[instrument(level = "debug")]
406 fn apply_database_update(
407 &mut self,
408 database: mz_catalog::durable::Database,
409 diff: StateDiff,
410 retractions: &mut InProgressRetractions,
411 ) {
412 apply_inverted_lookup(
413 &mut self.database_by_name,
414 &database.name,
415 database.id,
416 diff,
417 );
418 apply_with_update(
419 &mut self.database_by_id,
420 database,
421 |database| database.id,
422 diff,
423 &mut retractions.databases,
424 );
425 }
426
427 #[instrument(level = "debug")]
428 fn apply_schema_update(
429 &mut self,
430 schema: mz_catalog::durable::Schema,
431 diff: StateDiff,
432 retractions: &mut InProgressRetractions,
433 ) {
434 match &schema.database_id {
435 Some(database_id) => {
436 let db = self
437 .database_by_id
438 .get_mut(database_id)
439 .expect("catalog out of sync");
440 apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
441 apply_with_update(
442 &mut db.schemas_by_id,
443 schema,
444 |schema| schema.id,
445 diff,
446 &mut retractions.schemas,
447 );
448 }
449 None => {
450 apply_inverted_lookup(
451 &mut self.ambient_schemas_by_name,
452 &schema.name,
453 schema.id,
454 diff,
455 );
456 apply_with_update(
457 &mut self.ambient_schemas_by_id,
458 schema,
459 |schema| schema.id,
460 diff,
461 &mut retractions.schemas,
462 );
463 }
464 }
465 }
466
467 #[instrument(level = "debug")]
468 fn apply_default_privilege_update(
469 &mut self,
470 default_privilege: mz_catalog::durable::DefaultPrivilege,
471 diff: StateDiff,
472 _retractions: &mut InProgressRetractions,
473 ) {
474 match diff {
475 StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
476 .grant(default_privilege.object, default_privilege.acl_item),
477 StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
478 .revoke(&default_privilege.object, &default_privilege.acl_item),
479 }
480 }
481
482 #[instrument(level = "debug")]
483 fn apply_system_privilege_update(
484 &mut self,
485 system_privilege: MzAclItem,
486 diff: StateDiff,
487 _retractions: &mut InProgressRetractions,
488 ) {
489 match diff {
490 StateDiff::Addition => {
491 Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
492 }
493 StateDiff::Retraction => {
494 Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
495 }
496 }
497 }
498
499 #[instrument(level = "debug")]
500 fn apply_system_configuration_update(
501 &mut self,
502 system_configuration: mz_catalog::durable::SystemConfiguration,
503 diff: StateDiff,
504 _retractions: &mut InProgressRetractions,
505 ) {
506 let res = match diff {
507 StateDiff::Addition => self.insert_system_configuration(
508 &system_configuration.name,
509 VarInput::Flat(&system_configuration.value),
510 ),
511 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
512 };
513 match res {
514 Ok(_) => (),
515 Err(Error {
519 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
520 }) => {
521 warn!(%name, "unknown system parameter from catalog storage");
522 }
523 Err(e) => panic!("unable to update system variable: {e:?}"),
524 }
525 }
526
527 #[instrument(level = "debug")]
528 fn apply_cluster_update(
529 &mut self,
530 cluster: mz_catalog::durable::Cluster,
531 diff: StateDiff,
532 retractions: &mut InProgressRetractions,
533 ) {
534 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
535 apply_with_update(
536 &mut self.clusters_by_id,
537 cluster,
538 |cluster| cluster.id,
539 diff,
540 &mut retractions.clusters,
541 );
542 }
543
544 #[instrument(level = "debug")]
545 fn apply_network_policy_update(
546 &mut self,
547 policy: mz_catalog::durable::NetworkPolicy,
548 diff: StateDiff,
549 retractions: &mut InProgressRetractions,
550 ) {
551 apply_inverted_lookup(
552 &mut self.network_policies_by_name,
553 &policy.name,
554 policy.id,
555 diff,
556 );
557 apply_with_update(
558 &mut self.network_policies_by_id,
559 policy,
560 |policy| policy.id,
561 diff,
562 &mut retractions.network_policies,
563 );
564 }
565
566 #[instrument(level = "debug")]
567 fn apply_introspection_source_index_update(
568 &mut self,
569 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
570 diff: StateDiff,
571 retractions: &mut InProgressRetractions,
572 ) {
573 let cluster = self
574 .clusters_by_id
575 .get_mut(&introspection_source_index.cluster_id)
576 .expect("catalog out of sync");
577 let log = BUILTIN_LOG_LOOKUP
578 .get(introspection_source_index.name.as_str())
579 .expect("missing log");
580 apply_inverted_lookup(
581 &mut cluster.log_indexes,
582 &log.variant,
583 introspection_source_index.index_id,
584 diff,
585 );
586
587 match diff {
588 StateDiff::Addition => {
589 if let Some(mut entry) = retractions
590 .introspection_source_indexes
591 .remove(&introspection_source_index.item_id)
592 {
593 let (index_name, index) = self.create_introspection_source_index(
596 introspection_source_index.cluster_id,
597 log,
598 introspection_source_index.index_id,
599 );
600 assert_eq!(entry.id, introspection_source_index.item_id);
601 assert_eq!(entry.oid, introspection_source_index.oid);
602 assert_eq!(entry.name, index_name);
603 entry.item = index;
604 self.insert_entry(entry);
605 } else {
606 self.insert_introspection_source_index(
607 introspection_source_index.cluster_id,
608 log,
609 introspection_source_index.item_id,
610 introspection_source_index.index_id,
611 introspection_source_index.oid,
612 );
613 }
614 }
615 StateDiff::Retraction => {
616 let entry = self.drop_item(introspection_source_index.item_id);
617 retractions
618 .introspection_source_indexes
619 .insert(entry.id, entry);
620 }
621 }
622 }
623
624 #[instrument(level = "debug")]
625 fn apply_cluster_replica_update(
626 &mut self,
627 cluster_replica: mz_catalog::durable::ClusterReplica,
628 diff: StateDiff,
629 _retractions: &mut InProgressRetractions,
630 ) {
631 let cluster = self
632 .clusters_by_id
633 .get(&cluster_replica.cluster_id)
634 .expect("catalog out of sync");
635 let azs = cluster.availability_zones();
636 let location = self
637 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
638 .expect("catalog in unexpected state");
639 let cluster = self
640 .clusters_by_id
641 .get_mut(&cluster_replica.cluster_id)
642 .expect("catalog out of sync");
643 apply_inverted_lookup(
644 &mut cluster.replica_id_by_name_,
645 &cluster_replica.name,
646 cluster_replica.replica_id,
647 diff,
648 );
649 match diff {
650 StateDiff::Retraction => {
651 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
652 assert!(
653 prev.is_some(),
654 "retraction does not match existing value: {:?}",
655 cluster_replica.replica_id
656 );
657 }
658 StateDiff::Addition => {
659 let logging = ReplicaLogging {
660 log_logging: cluster_replica.config.logging.log_logging,
661 interval: cluster_replica.config.logging.interval,
662 };
663 let config = ReplicaConfig {
664 location,
665 compute: ComputeReplicaConfig { logging },
666 };
667 let mem_cluster_replica = ClusterReplica {
668 name: cluster_replica.name.clone(),
669 cluster_id: cluster_replica.cluster_id,
670 replica_id: cluster_replica.replica_id,
671 config,
672 owner_id: cluster_replica.owner_id,
673 };
674 let prev = cluster
675 .replicas_by_id_
676 .insert(cluster_replica.replica_id, mem_cluster_replica);
677 assert_eq!(
678 prev, None,
679 "values must be explicitly retracted before inserting a new value: {:?}",
680 cluster_replica.replica_id
681 );
682 }
683 }
684 }
685
686 #[instrument(level = "debug")]
687 fn apply_system_object_mapping_update(
688 &mut self,
689 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
690 diff: StateDiff,
691 retractions: &mut InProgressRetractions,
692 local_expression_cache: &mut LocalExpressionCache,
693 ) {
694 let item_id = system_object_mapping.unique_identifier.catalog_id;
695 let global_id = system_object_mapping.unique_identifier.global_id;
696
697 if system_object_mapping.unique_identifier.runtime_alterable() {
698 return;
702 }
703
704 if let StateDiff::Retraction = diff {
705 let entry = self.drop_item(item_id);
706 retractions.system_object_mappings.insert(item_id, entry);
707 return;
708 }
709
710 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
711 self.insert_entry(entry);
716 return;
717 }
718
719 let builtin = BUILTIN_LOOKUP
720 .get(&system_object_mapping.description)
721 .expect("missing builtin")
722 .1;
723 let schema_name = builtin.schema();
724 let schema_id = self
725 .ambient_schemas_by_name
726 .get(schema_name)
727 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
728 let name = QualifiedItemName {
729 qualifiers: ItemQualifiers {
730 database_spec: ResolvedDatabaseSpecifier::Ambient,
731 schema_spec: SchemaSpecifier::Id(*schema_id),
732 },
733 item: builtin.name().into(),
734 };
735 match builtin {
736 Builtin::Log(log) => {
737 let mut acl_items = vec![rbac::owner_privilege(
738 mz_sql::catalog::ObjectType::Source,
739 MZ_SYSTEM_ROLE_ID,
740 )];
741 acl_items.extend_from_slice(&log.access);
742 self.insert_item(
743 item_id,
744 log.oid,
745 name.clone(),
746 CatalogItem::Log(Log {
747 variant: log.variant,
748 global_id,
749 }),
750 MZ_SYSTEM_ROLE_ID,
751 PrivilegeMap::from_mz_acl_items(acl_items),
752 );
753 }
754
755 Builtin::Table(table) => {
756 let mut acl_items = vec![rbac::owner_privilege(
757 mz_sql::catalog::ObjectType::Table,
758 MZ_SYSTEM_ROLE_ID,
759 )];
760 acl_items.extend_from_slice(&table.access);
761
762 self.insert_item(
763 item_id,
764 table.oid,
765 name.clone(),
766 CatalogItem::Table(Table {
767 create_sql: None,
768 desc: VersionedRelationDesc::new(table.desc.clone()),
769 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
770 conn_id: None,
771 resolved_ids: ResolvedIds::empty(),
772 custom_logical_compaction_window: table.is_retained_metrics_object.then(
773 || {
774 self.system_config()
775 .metrics_retention()
776 .try_into()
777 .expect("invalid metrics retention")
778 },
779 ),
780 is_retained_metrics_object: table.is_retained_metrics_object,
781 data_source: TableDataSource::TableWrites {
782 defaults: vec![Expr::null(); table.desc.arity()],
783 },
784 }),
785 MZ_SYSTEM_ROLE_ID,
786 PrivilegeMap::from_mz_acl_items(acl_items),
787 );
788 }
789 Builtin::Index(index) => {
790 let custom_logical_compaction_window =
791 index.is_retained_metrics_object.then(|| {
792 self.system_config()
793 .metrics_retention()
794 .try_into()
795 .expect("invalid metrics retention")
796 });
797 let versions = BTreeMap::new();
799
800 let item = self
801 .parse_item(
802 global_id,
803 &index.create_sql(),
804 &versions,
805 None,
806 index.is_retained_metrics_object,
807 custom_logical_compaction_window,
808 local_expression_cache,
809 None,
810 )
811 .unwrap_or_else(|e| {
812 panic!(
813 "internal error: failed to load bootstrap index:\n\
814 {}\n\
815 error:\n\
816 {:?}\n\n\
817 make sure that the schema name is specified in the builtin index's create sql statement.",
818 index.name, e
819 )
820 });
821 let CatalogItem::Index(_) = item else {
822 panic!(
823 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
824 index.name
825 );
826 };
827
828 self.insert_item(
829 item_id,
830 index.oid,
831 name,
832 item,
833 MZ_SYSTEM_ROLE_ID,
834 PrivilegeMap::default(),
835 );
836 }
837 Builtin::View(_) => {
838 unreachable!("views added elsewhere");
840 }
841
842 Builtin::Type(typ) => {
844 let typ = self.resolve_builtin_type_references(typ);
845 if let CatalogType::Array { element_reference } = typ.details.typ {
846 let entry = self.get_entry_mut(&element_reference);
847 let item_type = match &mut entry.item {
848 CatalogItem::Type(item_type) => item_type,
849 _ => unreachable!("types can only reference other types"),
850 };
851 item_type.details.array_id = Some(item_id);
852 }
853
854 let schema_id = self.resolve_system_schema(typ.schema);
855
856 self.insert_item(
857 item_id,
858 typ.oid,
859 QualifiedItemName {
860 qualifiers: ItemQualifiers {
861 database_spec: ResolvedDatabaseSpecifier::Ambient,
862 schema_spec: SchemaSpecifier::Id(schema_id),
863 },
864 item: typ.name.to_owned(),
865 },
866 CatalogItem::Type(Type {
867 create_sql: None,
868 global_id,
869 details: typ.details.clone(),
870 resolved_ids: ResolvedIds::empty(),
871 }),
872 MZ_SYSTEM_ROLE_ID,
873 PrivilegeMap::from_mz_acl_items(vec![
874 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
875 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
876 ]),
877 );
878 }
879
880 Builtin::Func(func) => {
881 let oid = INVALID_OID;
885 self.insert_item(
886 item_id,
887 oid,
888 name.clone(),
889 CatalogItem::Func(Func {
890 inner: func.inner,
891 global_id,
892 }),
893 MZ_SYSTEM_ROLE_ID,
894 PrivilegeMap::default(),
895 );
896 }
897
898 Builtin::Source(coll) => {
899 let mut acl_items = vec![rbac::owner_privilege(
900 mz_sql::catalog::ObjectType::Source,
901 MZ_SYSTEM_ROLE_ID,
902 )];
903 acl_items.extend_from_slice(&coll.access);
904
905 self.insert_item(
906 item_id,
907 coll.oid,
908 name.clone(),
909 CatalogItem::Source(Source {
910 create_sql: None,
911 data_source: coll.data_source.clone(),
912 desc: coll.desc.clone(),
913 global_id,
914 timeline: Timeline::EpochMilliseconds,
915 resolved_ids: ResolvedIds::empty(),
916 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
917 || {
918 self.system_config()
919 .metrics_retention()
920 .try_into()
921 .expect("invalid metrics retention")
922 },
923 ),
924 is_retained_metrics_object: coll.is_retained_metrics_object,
925 }),
926 MZ_SYSTEM_ROLE_ID,
927 PrivilegeMap::from_mz_acl_items(acl_items),
928 );
929 }
930 Builtin::MaterializedView(mv) => {
931 let mut acl_items = vec![rbac::owner_privilege(
932 mz_sql::catalog::ObjectType::MaterializedView,
933 MZ_SYSTEM_ROLE_ID,
934 )];
935 acl_items.extend_from_slice(&mv.access);
936
937 let custom_logical_compaction_window = mv.is_retained_metrics_object.then(|| {
938 self.system_config()
939 .metrics_retention()
940 .try_into()
941 .expect("invalid metrics retention")
942 });
943
944 let versions = BTreeMap::new();
946
947 let mut item = self
948 .parse_item(
949 global_id,
950 &mv.create_sql(),
951 &versions,
952 None,
953 mv.is_retained_metrics_object,
954 custom_logical_compaction_window,
955 local_expression_cache,
956 None,
957 )
958 .unwrap_or_else(|e| {
959 panic!(
960 "internal error: failed to load bootstrap materialized view:\n\
961 {}\n\
962 error:\n\
963 {e:?}\n\n\
964 make sure that the schema name is specified in the builtin \
965 materialized view's create sql statement.",
966 mv.name,
967 )
968 });
969 let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
970 panic!(
971 "internal error: builtin materialized view {}'s SQL does not begin \
972 with \"CREATE MATERIALIZED VIEW\".",
973 mv.name,
974 );
975 };
976
977 let mut desc = catalog_mv.desc.latest();
981 for key in &mv.desc.typ().keys {
982 desc = desc.with_key(key.clone());
983 }
984 catalog_mv.desc = VersionedRelationDesc::new(desc);
985
986 self.insert_item(
987 item_id,
988 mv.oid,
989 name,
990 item,
991 MZ_SYSTEM_ROLE_ID,
992 PrivilegeMap::from_mz_acl_items(acl_items),
993 );
994 }
995 Builtin::Connection(connection) => {
996 let versions = BTreeMap::new();
998 let mut item = self
999 .parse_item(
1000 global_id,
1001 connection.sql,
1002 &versions,
1003 None,
1004 false,
1005 None,
1006 local_expression_cache,
1007 None,
1008 )
1009 .unwrap_or_else(|e| {
1010 panic!(
1011 "internal error: failed to load bootstrap connection:\n\
1012 {}\n\
1013 error:\n\
1014 {:?}\n\n\
1015 make sure that the schema name is specified in the builtin connection's create sql statement.",
1016 connection.name, e
1017 )
1018 });
1019 let CatalogItem::Connection(_) = &mut item else {
1020 panic!(
1021 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1022 connection.name
1023 );
1024 };
1025
1026 let mut acl_items = vec![rbac::owner_privilege(
1027 mz_sql::catalog::ObjectType::Connection,
1028 connection.owner_id.clone(),
1029 )];
1030 acl_items.extend_from_slice(connection.access);
1031
1032 self.insert_item(
1033 item_id,
1034 connection.oid,
1035 name.clone(),
1036 item,
1037 connection.owner_id.clone(),
1038 PrivilegeMap::from_mz_acl_items(acl_items),
1039 );
1040 }
1041 }
1042 }
1043
1044 #[instrument(level = "debug")]
1045 fn apply_temporary_item_update(
1046 &mut self,
1047 temporary_item: TemporaryItem,
1048 diff: StateDiff,
1049 retractions: &mut InProgressRetractions,
1050 local_expression_cache: &mut LocalExpressionCache,
1051 ) {
1052 match diff {
1053 StateDiff::Addition => {
1054 let TemporaryItem {
1055 id,
1056 oid,
1057 global_id,
1058 schema_id,
1059 name,
1060 conn_id,
1061 create_sql,
1062 owner_id,
1063 privileges,
1064 extra_versions,
1065 } = temporary_item;
1066 let temp_conn_id = conn_id
1069 .as_ref()
1070 .expect("temporary items must have a connection id");
1071 if !self.temporary_schemas.contains_key(temp_conn_id) {
1072 self.create_temporary_schema(temp_conn_id, owner_id)
1073 .expect("failed to create temporary schema");
1074 }
1075 let schema = self.find_temp_schema(&schema_id);
1076 let name = QualifiedItemName {
1077 qualifiers: ItemQualifiers {
1078 database_spec: schema.database().clone(),
1079 schema_spec: schema.id().clone(),
1080 },
1081 item: name.clone(),
1082 };
1083
1084 let entry = match retractions.temp_items.remove(&id) {
1085 Some(mut retraction) => {
1086 assert_eq!(retraction.id, id);
1087
1088 if retraction.create_sql() != create_sql {
1093 let mut catalog_item = self
1094 .deserialize_item(
1095 global_id,
1096 &create_sql,
1097 &extra_versions,
1098 local_expression_cache,
1099 Some(retraction.item),
1100 )
1101 .unwrap_or_else(|e| {
1102 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1103 });
1104 catalog_item.set_conn_id(conn_id);
1111 retraction.item = catalog_item;
1112 }
1113
1114 retraction.id = id;
1115 retraction.oid = oid;
1116 retraction.name = name;
1117 retraction.owner_id = owner_id;
1118 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1119 retraction
1120 }
1121 None => {
1122 let mut catalog_item = self
1123 .deserialize_item(
1124 global_id,
1125 &create_sql,
1126 &extra_versions,
1127 local_expression_cache,
1128 None,
1129 )
1130 .unwrap_or_else(|e| {
1131 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1132 });
1133
1134 catalog_item.set_conn_id(conn_id);
1140
1141 CatalogEntry {
1142 item: catalog_item,
1143 referenced_by: Vec::new(),
1144 used_by: Vec::new(),
1145 id,
1146 oid,
1147 name,
1148 owner_id,
1149 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1150 }
1151 }
1152 };
1153 self.insert_entry(entry);
1154 }
1155 StateDiff::Retraction => {
1156 let entry = self.drop_item(temporary_item.id);
1157 retractions.temp_items.insert(temporary_item.id, entry);
1158 }
1159 }
1160 }
1161
1162 #[instrument(level = "debug")]
1163 fn apply_item_update(
1164 &mut self,
1165 item: mz_catalog::durable::Item,
1166 diff: StateDiff,
1167 retractions: &mut InProgressRetractions,
1168 local_expression_cache: &mut LocalExpressionCache,
1169 ) -> Result<(), CatalogError> {
1170 match diff {
1171 StateDiff::Addition => {
1172 let key = item.key();
1173 let mz_catalog::durable::Item {
1174 id,
1175 oid,
1176 global_id,
1177 schema_id,
1178 name,
1179 create_sql,
1180 owner_id,
1181 privileges,
1182 extra_versions,
1183 } = item;
1184 let schema = self.find_non_temp_schema(&schema_id);
1185 let name = QualifiedItemName {
1186 qualifiers: ItemQualifiers {
1187 database_spec: schema.database().clone(),
1188 schema_spec: schema.id().clone(),
1189 },
1190 item: name.clone(),
1191 };
1192 let entry = match retractions.items.remove(&key) {
1193 Some(retraction) => {
1194 assert_eq!(retraction.id, item.id);
1195
1196 let item = self
1197 .deserialize_item(
1198 global_id,
1199 &create_sql,
1200 &extra_versions,
1201 local_expression_cache,
1202 Some(retraction.item),
1203 )
1204 .unwrap_or_else(|e| {
1205 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1206 });
1207
1208 CatalogEntry {
1209 item,
1210 id,
1211 oid,
1212 name,
1213 owner_id,
1214 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1215 referenced_by: retraction.referenced_by,
1216 used_by: retraction.used_by,
1217 }
1218 }
1219 None => {
1220 let catalog_item = self
1221 .deserialize_item(
1222 global_id,
1223 &create_sql,
1224 &extra_versions,
1225 local_expression_cache,
1226 None,
1227 )
1228 .unwrap_or_else(|e| {
1229 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1230 });
1231 CatalogEntry {
1232 item: catalog_item,
1233 referenced_by: Vec::new(),
1234 used_by: Vec::new(),
1235 id,
1236 oid,
1237 name,
1238 owner_id,
1239 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1240 }
1241 }
1242 };
1243
1244 self.insert_entry(entry);
1245 }
1246 StateDiff::Retraction => {
1247 let entry = self.drop_item(item.id);
1248 let key = item.into_key_value().0;
1249 retractions.items.insert(key, entry);
1250 }
1251 }
1252 Ok(())
1253 }
1254
1255 #[instrument(level = "debug")]
1256 fn apply_comment_update(
1257 &mut self,
1258 comment: mz_catalog::durable::Comment,
1259 diff: StateDiff,
1260 _retractions: &mut InProgressRetractions,
1261 ) {
1262 match diff {
1263 StateDiff::Addition => {
1264 let prev = Arc::make_mut(&mut self.comments).update_comment(
1265 comment.object_id,
1266 comment.sub_component,
1267 Some(comment.comment),
1268 );
1269 assert_eq!(
1270 prev, None,
1271 "values must be explicitly retracted before inserting a new value"
1272 );
1273 }
1274 StateDiff::Retraction => {
1275 let prev = Arc::make_mut(&mut self.comments).update_comment(
1276 comment.object_id,
1277 comment.sub_component,
1278 None,
1279 );
1280 assert_eq!(
1281 prev,
1282 Some(comment.comment),
1283 "retraction does not match existing value: ({:?}, {:?})",
1284 comment.object_id,
1285 comment.sub_component,
1286 );
1287 }
1288 }
1289 }
1290
1291 #[instrument(level = "debug")]
1292 fn apply_source_references_update(
1293 &mut self,
1294 source_references: mz_catalog::durable::SourceReferences,
1295 diff: StateDiff,
1296 _retractions: &mut InProgressRetractions,
1297 ) {
1298 match diff {
1299 StateDiff::Addition => {
1300 let prev = self
1301 .source_references
1302 .insert(source_references.source_id, source_references.into());
1303 assert!(
1304 prev.is_none(),
1305 "values must be explicitly retracted before inserting a new value: {prev:?}"
1306 );
1307 }
1308 StateDiff::Retraction => {
1309 let prev = self.source_references.remove(&source_references.source_id);
1310 assert!(
1311 prev.is_some(),
1312 "retraction for a non-existent existing value: {source_references:?}"
1313 );
1314 }
1315 }
1316 }
1317
1318 #[instrument(level = "debug")]
1319 fn apply_storage_collection_metadata_update(
1320 &mut self,
1321 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1322 diff: StateDiff,
1323 _retractions: &mut InProgressRetractions,
1324 ) {
1325 apply_inverted_lookup(
1326 &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1327 &storage_collection_metadata.id,
1328 storage_collection_metadata.shard,
1329 diff,
1330 );
1331 }
1332
1333 #[instrument(level = "debug")]
1334 fn apply_unfinalized_shard_update(
1335 &mut self,
1336 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1337 diff: StateDiff,
1338 _retractions: &mut InProgressRetractions,
1339 ) {
1340 match diff {
1341 StateDiff::Addition => {
1342 let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1343 .unfinalized_shards
1344 .insert(unfinalized_shard.shard);
1345 assert!(
1346 newly_inserted,
1347 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1348 );
1349 }
1350 StateDiff::Retraction => {
1351 let removed = Arc::make_mut(&mut self.storage_metadata)
1352 .unfinalized_shards
1353 .remove(&unfinalized_shard.shard);
1354 assert!(
1355 removed,
1356 "retraction does not match existing value: {unfinalized_shard:?}"
1357 );
1358 }
1359 }
1360 }
1361
1362 #[instrument]
1365 pub(crate) fn generate_builtin_table_updates(
1366 &self,
1367 updates: Vec<StateUpdate>,
1368 ) -> Vec<BuiltinTableUpdate> {
1369 let mut builtin_table_updates = Vec::new();
1370 for StateUpdate { kind, ts: _, diff } in updates {
1371 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1372 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1373 builtin_table_updates.extend(builtin_table_update);
1374 }
1375 builtin_table_updates
1376 }
1377
1378 #[instrument(level = "debug")]
1381 pub(crate) fn generate_builtin_table_update(
1382 &self,
1383 kind: StateUpdateKind,
1384 diff: StateDiff,
1385 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1386 let diff = diff.into();
1387 match kind {
1388 StateUpdateKind::Role(role) => self.pack_role_update(role.id, diff),
1389 StateUpdateKind::RoleAuth(role_auth) => {
1390 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1391 }
1392 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1393 vec![self.pack_default_privileges_update(
1394 &default_privilege.object,
1395 &default_privilege.acl_item.grantee,
1396 &default_privilege.acl_item.acl_mode,
1397 diff,
1398 )]
1399 }
1400 StateUpdateKind::SystemPrivilege(system_privilege) => {
1401 vec![self.pack_system_privileges_update(system_privilege, diff)]
1402 }
1403 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1404 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1405 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1406 self.pack_item_update(introspection_source_index.item_id, diff)
1407 }
1408 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1409 cluster_replica.cluster_id,
1410 &cluster_replica.name,
1411 diff,
1412 ),
1413 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1414 if !system_object_mapping.unique_identifier.runtime_alterable() {
1418 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1419 } else {
1420 vec![]
1421 }
1422 }
1423 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1424 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1425 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1426 comment.object_id,
1427 comment.sub_component,
1428 &comment.comment,
1429 diff,
1430 )],
1431 StateUpdateKind::SourceReferences(source_references) => {
1432 self.pack_source_references_update(&source_references, diff)
1433 }
1434 StateUpdateKind::AuditLog(audit_log) => {
1435 vec![
1436 self.pack_audit_log_update(&audit_log.event, diff)
1437 .expect("could not pack audit log update"),
1438 ]
1439 }
1440 StateUpdateKind::Database(_)
1441 | StateUpdateKind::Schema(_)
1442 | StateUpdateKind::NetworkPolicy(_)
1443 | StateUpdateKind::StorageCollectionMetadata(_)
1444 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1445 }
1446 }
1447
1448 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1449 self.entry_by_id
1450 .get_mut(id)
1451 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1452 }
1453
1454 pub(super) fn set_optimized_plan(
1459 &mut self,
1460 id: GlobalId,
1461 plan: DataflowDescription<mz_expr::OptimizedMirRelationExpr>,
1462 ) {
1463 let item_id = self.entry_by_global_id[&id];
1464 let entry = self.get_entry_mut(&item_id);
1465 match entry.item_mut() {
1466 CatalogItem::Index(idx) => idx.optimized_plan = Some(Arc::new(plan)),
1467 CatalogItem::MaterializedView(mv) => mv.optimized_plan = Some(Arc::new(plan)),
1468 other => panic!("set_optimized_plan called on {} ({:?})", id, other.typ()),
1469 }
1470 }
1471
1472 pub(super) fn set_physical_plan(
1477 &mut self,
1478 id: GlobalId,
1479 plan: DataflowDescription<mz_compute_types::plan::Plan>,
1480 ) {
1481 let item_id = self.entry_by_global_id[&id];
1482 let entry = self.get_entry_mut(&item_id);
1483 match entry.item_mut() {
1484 CatalogItem::Index(idx) => idx.physical_plan = Some(Arc::new(plan)),
1485 CatalogItem::MaterializedView(mv) => mv.physical_plan = Some(Arc::new(plan)),
1486 other => panic!("set_physical_plan called on {} ({:?})", id, other.typ()),
1487 }
1488 }
1489
1490 pub(super) fn set_dataflow_metainfo(
1495 &mut self,
1496 id: GlobalId,
1497 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
1498 ) {
1499 for notice in metainfo.optimizer_notices.iter() {
1501 for dep_id in notice.dependencies.iter() {
1502 self.notices_by_dep_id
1503 .entry(*dep_id)
1504 .or_default()
1505 .push(Arc::clone(notice));
1506 }
1507 if let Some(item_id) = notice.item_id {
1508 soft_assert_eq_or_log!(
1509 item_id,
1510 id,
1511 "notice.item_id should match the id for whom we are saving the notice"
1512 );
1513 }
1514 }
1515 let item_id = self.entry_by_global_id[&id];
1517 let entry = self.get_entry_mut(&item_id);
1518 match entry.item_mut() {
1519 CatalogItem::Index(idx) => idx.dataflow_metainfo = Some(metainfo),
1520 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo = Some(metainfo),
1521 other => panic!("set_dataflow_metainfo called on {} ({:?})", id, other.typ()),
1522 }
1523 }
1524
1525 #[mz_ore::instrument(level = "trace")]
1536 pub(super) fn drop_optimizer_notices(
1537 &mut self,
1538 dropped_entries: Vec<CatalogEntry>,
1539 ) -> BTreeSet<Arc<OptimizerNotice>> {
1540 let mut dropped_notices = BTreeSet::new();
1541 let mut drop_ids = BTreeSet::new();
1542
1543 for mut entry in dropped_entries {
1545 drop_ids.extend(entry.global_ids());
1546 let metainfo = match entry.item_mut() {
1547 CatalogItem::Index(idx) => idx.dataflow_metainfo.take(),
1548 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.take(),
1549 _ => None,
1550 };
1551 if let Some(mut metainfo) = metainfo {
1552 soft_assert_or_log!(
1553 metainfo.optimizer_notices.iter().all_unique(),
1554 "should have been pushed there by \
1555 `push_optimizer_notice_dedup`"
1556 );
1557 for n in metainfo.optimizer_notices.drain(..) {
1558 for dep_id in n.dependencies.iter() {
1561 if let Some(notices) = self.notices_by_dep_id.get_mut(dep_id) {
1562 notices.retain(|x| &n != x);
1563 if notices.is_empty() {
1564 self.notices_by_dep_id.remove(dep_id);
1565 }
1566 }
1567 }
1568 dropped_notices.insert(n);
1569 }
1570 }
1571 }
1572
1573 for id in &drop_ids {
1577 if let Some(notices) = self.notices_by_dep_id.remove(id) {
1578 for n in notices.into_iter() {
1579 if let Some(item_id) = n.item_id.as_ref() {
1584 if let Some(entry) = self.try_get_entry_by_global_id(item_id) {
1585 let catalog_item_id = entry.id();
1586 let entry = self.get_entry_mut(&catalog_item_id);
1587 let item = entry.item_mut();
1588 match item {
1589 CatalogItem::Index(idx) => {
1590 if let Some(ref mut m) = idx.dataflow_metainfo {
1591 m.optimizer_notices.retain(|x| &n != x);
1592 }
1593 }
1594 CatalogItem::MaterializedView(mv) => {
1595 if let Some(ref mut m) = mv.dataflow_metainfo {
1596 m.optimizer_notices.retain(|x| &n != x);
1597 }
1598 }
1599 _ => {}
1600 }
1601 }
1602 }
1603 dropped_notices.insert(n);
1604 }
1605 }
1606 }
1607
1608 let todo_dep_ids: BTreeSet<GlobalId> = dropped_notices
1611 .iter()
1612 .flat_map(|n| n.dependencies.iter())
1613 .filter(|dep_id| !drop_ids.contains(dep_id))
1614 .copied()
1615 .collect();
1616 for id in todo_dep_ids {
1617 if let Some(notices) = self.notices_by_dep_id.get_mut(&id) {
1618 notices.retain(|n| !dropped_notices.contains(n));
1619 if notices.is_empty() {
1620 self.notices_by_dep_id.remove(&id);
1621 }
1622 }
1623 }
1624
1625 dropped_notices
1626 }
1627
1628 fn get_schema_mut(
1629 &mut self,
1630 database_spec: &ResolvedDatabaseSpecifier,
1631 schema_spec: &SchemaSpecifier,
1632 conn_id: &ConnectionId,
1633 ) -> &mut Schema {
1634 match (database_spec, schema_spec) {
1636 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1637 .temporary_schemas
1638 .get_mut(conn_id)
1639 .expect("catalog out of sync"),
1640 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1641 .ambient_schemas_by_id
1642 .get_mut(id)
1643 .expect("catalog out of sync"),
1644 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1645 .database_by_id
1646 .get_mut(database_id)
1647 .expect("catalog out of sync")
1648 .schemas_by_id
1649 .get_mut(schema_id)
1650 .expect("catalog out of sync"),
1651 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1652 unreachable!("temporary schemas are in the ambient database")
1653 }
1654 }
1655 }
1656
1657 #[instrument(name = "catalog::parse_views")]
1667 async fn parse_builtin_views(
1668 state: &mut CatalogState,
1669 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1670 retractions: &mut InProgressRetractions,
1671 local_expression_cache: &mut LocalExpressionCache,
1672 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1673 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1674 let (updates, additions): (Vec<_>, Vec<_>) =
1675 builtin_views
1676 .into_iter()
1677 .partition_map(|(view, item_id, gid)| {
1678 match retractions.system_object_mappings.remove(&item_id) {
1679 Some(entry) => Either::Left(entry),
1680 None => Either::Right((view, item_id, gid)),
1681 }
1682 });
1683
1684 for entry in updates {
1685 let item_id = entry.id();
1690 state.insert_entry(entry);
1691 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1692 }
1693
1694 let mut handles = Vec::new();
1695 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1696 BTreeMap::new();
1697 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1698 let mut awaiting_all = Vec::new();
1701 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1703 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1704
1705 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1707 .into_iter()
1708 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1709 .collect();
1710 let item_ids: Vec<_> = views.keys().copied().collect();
1711
1712 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1713 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1714 if handles.is_empty() && ready.is_empty() {
1715 ready.extend(awaiting_all.drain(..));
1717 }
1718
1719 if !ready.is_empty() {
1721 let spawn_state = Arc::new(state.clone());
1722 while let Some(id) = ready.pop_front() {
1723 let (view, global_id) = views.get(&id).expect("must exist");
1724 let global_id = *global_id;
1725 let create_sql = view.create_sql();
1726 let versions = BTreeMap::new();
1728
1729 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1730 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1731 let task_state = Arc::clone(&spawn_state);
1732 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1733 let handle = mz_ore::task::spawn_blocking(
1734 || "parse view",
1735 move || {
1736 span.in_scope(|| {
1737 let res = task_state.parse_item_inner(
1738 global_id,
1739 &create_sql,
1740 &versions,
1741 None,
1742 false,
1743 None,
1744 cached_expr,
1745 None,
1746 );
1747 (id, global_id, res)
1748 })
1749 },
1750 );
1751 handles.push(handle);
1752 }
1753 }
1754
1755 let (selected, _idx, remaining) = future::select_all(handles).await;
1757 handles = remaining;
1758 let (id, global_id, res) = selected;
1759 let mut insert_cached_expr = |cached_expr| {
1760 if let Some(cached_expr) = cached_expr {
1761 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1762 }
1763 };
1764 match res {
1765 Ok((item, uncached_expr)) => {
1766 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1767 local_expression_cache.insert_uncached_expression(
1768 global_id,
1769 uncached_expr,
1770 optimizer_features,
1771 );
1772 }
1773 let (view, _gid) = views.remove(&id).expect("must exist");
1775 let schema_id = state
1776 .ambient_schemas_by_name
1777 .get(view.schema)
1778 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1779 let qname = QualifiedItemName {
1780 qualifiers: ItemQualifiers {
1781 database_spec: ResolvedDatabaseSpecifier::Ambient,
1782 schema_spec: SchemaSpecifier::Id(*schema_id),
1783 },
1784 item: view.name.into(),
1785 };
1786 let mut acl_items = vec![rbac::owner_privilege(
1787 mz_sql::catalog::ObjectType::View,
1788 MZ_SYSTEM_ROLE_ID,
1789 )];
1790 acl_items.extend_from_slice(&view.access);
1791
1792 state.insert_item(
1793 id,
1794 view.oid,
1795 qname,
1796 item,
1797 MZ_SYSTEM_ROLE_ID,
1798 PrivilegeMap::from_mz_acl_items(acl_items),
1799 );
1800
1801 let mut resolved_dependent_items = Vec::new();
1803 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1804 resolved_dependent_items.extend(dependent_items);
1805 }
1806 let entry = state.get_entry(&id);
1807 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1808 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1809 resolved_dependent_items.extend(dependent_items);
1810 }
1811 ready.extend(resolved_dependent_items);
1812
1813 completed_ids.insert(id);
1814 completed_names.insert(full_name);
1815 }
1816 Err((
1818 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1819 cached_expr,
1820 )) => {
1821 insert_cached_expr(cached_expr);
1822 if completed_ids.contains(&missing_dep) {
1823 ready.push_back(id);
1824 } else {
1825 awaiting_id_dependencies
1826 .entry(missing_dep)
1827 .or_default()
1828 .push(id);
1829 }
1830 }
1831 Err((
1833 AdapterError::PlanError(plan::PlanError::Catalog(
1834 SqlCatalogError::UnknownItem(missing_dep),
1835 )),
1836 cached_expr,
1837 )) => {
1838 insert_cached_expr(cached_expr);
1839 match CatalogItemId::from_str(&missing_dep) {
1840 Ok(missing_dep) => {
1841 if completed_ids.contains(&missing_dep) {
1842 ready.push_back(id);
1843 } else {
1844 awaiting_id_dependencies
1845 .entry(missing_dep)
1846 .or_default()
1847 .push(id);
1848 }
1849 }
1850 Err(_) => {
1851 if completed_names.contains(&missing_dep) {
1852 ready.push_back(id);
1853 } else {
1854 awaiting_name_dependencies
1855 .entry(missing_dep)
1856 .or_default()
1857 .push(id);
1858 }
1859 }
1860 }
1861 }
1862 Err((
1863 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1864 cached_expr,
1865 )) => {
1866 insert_cached_expr(cached_expr);
1867 awaiting_all.push(id);
1868 }
1869 Err((e, _)) => {
1870 let (bad_view, _gid) = views.get(&id).expect("must exist");
1871 panic!(
1872 "internal error: failed to load bootstrap view:\n\
1873 {name}\n\
1874 error:\n\
1875 {e:?}\n\n\
1876 Make sure that the schema name is specified in the builtin view's create sql statement.
1877 ",
1878 name = bad_view.name,
1879 )
1880 }
1881 }
1882 }
1883
1884 assert!(awaiting_id_dependencies.is_empty());
1885 assert!(
1886 awaiting_name_dependencies.is_empty(),
1887 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1888 );
1889 assert!(awaiting_all.is_empty());
1890 assert!(views.is_empty());
1891
1892 builtin_table_updates.extend(
1894 item_ids
1895 .into_iter()
1896 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1897 );
1898
1899 builtin_table_updates
1900 }
1901
1902 fn insert_entry(&mut self, entry: CatalogEntry) {
1904 if !entry.id.is_system() {
1905 if let Some(cluster_id) = entry.item.cluster_id() {
1906 self.clusters_by_id
1907 .get_mut(&cluster_id)
1908 .expect("catalog out of sync")
1909 .bound_objects
1910 .insert(entry.id);
1911 };
1912 }
1913
1914 for u in entry.references().items() {
1915 match self.entry_by_id.get_mut(u) {
1916 Some(metadata) => metadata.referenced_by.push(entry.id()),
1917 None => panic!(
1918 "Catalog: missing dependent catalog item {} while installing {}",
1919 &u,
1920 self.resolve_full_name(entry.name(), entry.conn_id())
1921 ),
1922 }
1923 }
1924 for u in entry.uses() {
1925 if u == entry.id() {
1928 continue;
1929 }
1930 match self.entry_by_id.get_mut(&u) {
1931 Some(metadata) => metadata.used_by.push(entry.id()),
1932 None => panic!(
1933 "Catalog: missing dependent catalog item {} while installing {}",
1934 &u,
1935 self.resolve_full_name(entry.name(), entry.conn_id())
1936 ),
1937 }
1938 }
1939 for gid in entry.item.global_ids() {
1940 self.entry_by_global_id.insert(gid, entry.id());
1941 }
1942 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
1943 if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
1946 && !self.temporary_schemas.contains_key(conn_id)
1947 {
1948 self.create_temporary_schema(conn_id, entry.owner_id)
1949 .expect("failed to create temporary schema");
1950 }
1951 let schema = self.get_schema_mut(
1952 &entry.name().qualifiers.database_spec,
1953 &entry.name().qualifiers.schema_spec,
1954 conn_id,
1955 );
1956
1957 let prev_id = match entry.item() {
1958 CatalogItem::Func(_) => schema
1959 .functions
1960 .insert(entry.name().item.clone(), entry.id()),
1961 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
1962 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
1963 };
1964
1965 assert!(
1966 prev_id.is_none(),
1967 "builtin name collision on {:?}",
1968 entry.name().item.clone()
1969 );
1970
1971 self.entry_by_id.insert(entry.id(), entry.clone());
1972 }
1973
1974 fn insert_item(
1976 &mut self,
1977 id: CatalogItemId,
1978 oid: u32,
1979 name: QualifiedItemName,
1980 item: CatalogItem,
1981 owner_id: RoleId,
1982 privileges: PrivilegeMap,
1983 ) {
1984 let entry = CatalogEntry {
1985 item,
1986 name,
1987 id,
1988 oid,
1989 used_by: Vec::new(),
1990 referenced_by: Vec::new(),
1991 owner_id,
1992 privileges,
1993 };
1994
1995 self.insert_entry(entry);
1996 }
1997
1998 #[mz_ore::instrument(level = "trace")]
1999 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
2000 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
2001 for u in metadata.references().items() {
2002 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
2003 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
2004 }
2005 }
2006 for u in metadata.uses() {
2007 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
2008 dep_metadata.used_by.retain(|u| *u != metadata.id())
2009 }
2010 }
2011 for gid in metadata.global_ids() {
2012 self.entry_by_global_id.remove(&gid);
2013 }
2014
2015 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2016 let schema = self.get_schema_mut(
2017 &metadata.name().qualifiers.database_spec,
2018 &metadata.name().qualifiers.schema_spec,
2019 conn_id,
2020 );
2021 if metadata.item_type() == CatalogItemType::Type {
2022 schema
2023 .types
2024 .remove(&metadata.name().item)
2025 .expect("catalog out of sync");
2026 } else {
2027 assert_ne!(metadata.item_type(), CatalogItemType::Func);
2030
2031 schema
2032 .items
2033 .remove(&metadata.name().item)
2034 .expect("catalog out of sync");
2035 };
2036
2037 if !id.is_system() {
2038 if let Some(cluster_id) = metadata.item().cluster_id() {
2039 assert!(
2040 self.clusters_by_id
2041 .get_mut(&cluster_id)
2042 .expect("catalog out of sync")
2043 .bound_objects
2044 .remove(&id),
2045 "catalog out of sync"
2046 );
2047 }
2048 }
2049
2050 metadata
2051 }
2052
2053 fn insert_introspection_source_index(
2054 &mut self,
2055 cluster_id: ClusterId,
2056 log: &'static BuiltinLog,
2057 item_id: CatalogItemId,
2058 global_id: GlobalId,
2059 oid: u32,
2060 ) {
2061 let (index_name, index) =
2062 self.create_introspection_source_index(cluster_id, log, global_id);
2063 self.insert_item(
2064 item_id,
2065 oid,
2066 index_name,
2067 index,
2068 MZ_SYSTEM_ROLE_ID,
2069 PrivilegeMap::default(),
2070 );
2071 }
2072
2073 fn create_introspection_source_index(
2074 &self,
2075 cluster_id: ClusterId,
2076 log: &'static BuiltinLog,
2077 global_id: GlobalId,
2078 ) -> (QualifiedItemName, CatalogItem) {
2079 let source_name = FullItemName {
2080 database: RawDatabaseSpecifier::Ambient,
2081 schema: log.schema.into(),
2082 item: log.name.into(),
2083 };
2084 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
2085 let mut index_name = QualifiedItemName {
2086 qualifiers: ItemQualifiers {
2087 database_spec: ResolvedDatabaseSpecifier::Ambient,
2088 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
2089 },
2090 item: index_name.clone(),
2091 };
2092 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
2093 let index_item_name = index_name.item.clone();
2094 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
2095 let index = CatalogItem::Index(Index {
2096 global_id,
2097 on: log_global_id,
2098 keys: log
2099 .variant
2100 .index_by()
2101 .into_iter()
2102 .map(MirScalarExpr::column)
2103 .collect(),
2104 create_sql: index_sql(
2105 index_item_name,
2106 cluster_id,
2107 source_name,
2108 &log.variant.desc(),
2109 &log.variant.index_by(),
2110 ),
2111 conn_id: None,
2112 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
2113 cluster_id,
2114 is_retained_metrics_object: false,
2115 custom_logical_compaction_window: None,
2116 optimized_plan: None,
2117 physical_plan: None,
2118 dataflow_metainfo: None,
2119 });
2120 (index_name, index)
2121 }
2122
2123 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
2128 Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
2129 }
2130
2131 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
2136 Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
2137 }
2138}
2139
2140fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
2148 fn push_update<T>(
2149 update: T,
2150 diff: StateDiff,
2151 retractions: &mut Vec<T>,
2152 additions: &mut Vec<T>,
2153 ) {
2154 match diff {
2155 StateDiff::Retraction => retractions.push(update),
2156 StateDiff::Addition => additions.push(update),
2157 }
2158 }
2159
2160 soft_assert_no_log!(
2161 updates.iter().map(|update| update.ts).all_equal(),
2162 "all timestamps should be equal: {updates:?}"
2163 );
2164 soft_assert_no_log!(
2165 {
2166 let mut dedup = BTreeSet::new();
2167 updates.iter().all(|update| dedup.insert(&update.kind))
2168 },
2169 "updates should be consolidated: {updates:?}"
2170 );
2171
2172 let mut pre_cluster_retractions = Vec::new();
2174 let mut pre_cluster_additions = Vec::new();
2175 let mut cluster_retractions = Vec::new();
2176 let mut cluster_additions = Vec::new();
2177 let mut builtin_item_updates = Vec::new();
2178 let mut item_retractions = Vec::new();
2179 let mut item_additions = Vec::new();
2180 let mut temp_item_retractions = Vec::new();
2181 let mut temp_item_additions = Vec::new();
2182 let mut post_item_retractions = Vec::new();
2183 let mut post_item_additions = Vec::new();
2184 for update in updates {
2185 let diff = update.diff.clone();
2186 match update.kind {
2187 StateUpdateKind::Role(_)
2188 | StateUpdateKind::RoleAuth(_)
2189 | StateUpdateKind::Database(_)
2190 | StateUpdateKind::Schema(_)
2191 | StateUpdateKind::DefaultPrivilege(_)
2192 | StateUpdateKind::SystemPrivilege(_)
2193 | StateUpdateKind::SystemConfiguration(_)
2194 | StateUpdateKind::NetworkPolicy(_) => push_update(
2195 update,
2196 diff,
2197 &mut pre_cluster_retractions,
2198 &mut pre_cluster_additions,
2199 ),
2200 StateUpdateKind::Cluster(_)
2201 | StateUpdateKind::IntrospectionSourceIndex(_)
2202 | StateUpdateKind::ClusterReplica(_) => push_update(
2203 update,
2204 diff,
2205 &mut cluster_retractions,
2206 &mut cluster_additions,
2207 ),
2208 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2209 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2210 }
2211 StateUpdateKind::TemporaryItem(item) => push_update(
2212 (item, update.ts, update.diff),
2213 diff,
2214 &mut temp_item_retractions,
2215 &mut temp_item_additions,
2216 ),
2217 StateUpdateKind::Item(item) => push_update(
2218 (item, update.ts, update.diff),
2219 diff,
2220 &mut item_retractions,
2221 &mut item_additions,
2222 ),
2223 StateUpdateKind::Comment(_)
2224 | StateUpdateKind::SourceReferences(_)
2225 | StateUpdateKind::AuditLog(_)
2226 | StateUpdateKind::StorageCollectionMetadata(_)
2227 | StateUpdateKind::UnfinalizedShard(_) => push_update(
2228 update,
2229 diff,
2230 &mut post_item_retractions,
2231 &mut post_item_additions,
2232 ),
2233 }
2234 }
2235
2236 let builtin_item_updates = builtin_item_updates
2239 .into_iter()
2240 .map(|(system_object_mapping, ts, diff)| {
2241 let idx = BUILTIN_LOOKUP
2242 .get(&system_object_mapping.description)
2243 .expect("missing builtin")
2244 .0;
2245 (idx, system_object_mapping, ts, diff)
2246 })
2247 .sorted_by_key(|(idx, _, _, _)| *idx)
2248 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2249
2250 let mut builtin_source_retractions = Vec::new();
2254 let mut builtin_source_additions = Vec::new();
2255 let mut other_builtin_retractions = Vec::new();
2256 let mut other_builtin_additions = Vec::new();
2257 for (builtin_item_update, ts, diff) in builtin_item_updates {
2258 let object_type = builtin_item_update.description.object_type;
2259 let update = StateUpdate {
2260 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2261 ts,
2262 diff,
2263 };
2264 if object_type == CatalogItemType::Source {
2265 push_update(
2266 update,
2267 diff,
2268 &mut builtin_source_retractions,
2269 &mut builtin_source_additions,
2270 );
2271 } else {
2272 push_update(
2273 update,
2274 diff,
2275 &mut other_builtin_retractions,
2276 &mut other_builtin_additions,
2277 );
2278 }
2279 }
2280
2281 fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2287 tracing::debug!(?items, "sorting items by dependencies");
2288
2289 let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2290 let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2291 let statement = mz_sql::parse::parse(&item.0.create_sql)
2292 .expect("valid create_sql")
2293 .into_element()
2294 .ast;
2295 mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2296 };
2297 sort_topological(items, key_fn, dependencies_fn);
2298 }
2299
2300 fn sort_item_updates(
2316 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2317 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2318 let mut types = Vec::new();
2321 let mut funcs = Vec::new();
2324 let mut secrets = Vec::new();
2325 let mut connections = Vec::new();
2326 let mut sources = Vec::new();
2327 let mut tables = Vec::new();
2328 let mut derived_items = Vec::new();
2329 let mut sinks = Vec::new();
2330 for update in item_updates {
2331 match update.0.item_type() {
2332 CatalogItemType::Type => types.push(update),
2333 CatalogItemType::Func => funcs.push(update),
2334 CatalogItemType::Secret => secrets.push(update),
2335 CatalogItemType::Connection => connections.push(update),
2336 CatalogItemType::Source => sources.push(update),
2337 CatalogItemType::Table => tables.push(update),
2338 CatalogItemType::View
2339 | CatalogItemType::MaterializedView
2340 | CatalogItemType::Index => derived_items.push(update),
2341 CatalogItemType::Sink => sinks.push(update),
2342 }
2343 }
2344
2345 sort_items_topological(&mut connections);
2349 sort_items_topological(&mut derived_items);
2350
2351 for group in [
2353 &mut types,
2354 &mut funcs,
2355 &mut secrets,
2356 &mut sources,
2357 &mut tables,
2358 &mut sinks,
2359 ] {
2360 group.sort_by_key(|(item, _, _)| item.id);
2361 }
2362
2363 iter::empty()
2364 .chain(types)
2365 .chain(funcs)
2366 .chain(secrets)
2367 .chain(connections)
2368 .chain(sources)
2369 .chain(tables)
2370 .chain(derived_items)
2371 .chain(sinks)
2372 .collect()
2373 }
2374
2375 let item_retractions = sort_item_updates(item_retractions);
2376 let item_additions = sort_item_updates(item_additions);
2377
2378 fn sort_temp_item_updates(
2382 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2383 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2384 let mut types = Vec::new();
2387 let mut funcs = Vec::new();
2389 let mut secrets = Vec::new();
2390 let mut connections = Vec::new();
2391 let mut sources = Vec::new();
2392 let mut tables = Vec::new();
2393 let mut derived_items = Vec::new();
2394 let mut sinks = Vec::new();
2395 for update in temp_item_updates {
2396 match update.0.item_type() {
2397 CatalogItemType::Type => types.push(update),
2398 CatalogItemType::Func => funcs.push(update),
2399 CatalogItemType::Secret => secrets.push(update),
2400 CatalogItemType::Connection => connections.push(update),
2401 CatalogItemType::Source => sources.push(update),
2402 CatalogItemType::Table => tables.push(update),
2403 CatalogItemType::View
2404 | CatalogItemType::MaterializedView
2405 | CatalogItemType::Index => derived_items.push(update),
2406 CatalogItemType::Sink => sinks.push(update),
2407 }
2408 }
2409
2410 for group in [
2412 &mut types,
2413 &mut funcs,
2414 &mut secrets,
2415 &mut connections,
2416 &mut sources,
2417 &mut tables,
2418 &mut derived_items,
2419 &mut sinks,
2420 ] {
2421 group.sort_by_key(|(item, _, _)| item.id);
2422 }
2423
2424 iter::empty()
2425 .chain(types)
2426 .chain(funcs)
2427 .chain(secrets)
2428 .chain(connections)
2429 .chain(sources)
2430 .chain(tables)
2431 .chain(derived_items)
2432 .chain(sinks)
2433 .collect()
2434 }
2435 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2436 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2437
2438 fn merge_item_updates(
2440 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2441 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2442 ) -> Vec<StateUpdate> {
2443 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2444
2445 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2446 (item_updates.front(), temp_item_updates.front())
2447 {
2448 if item.id < temp_item.id {
2449 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2450 state_updates.push(StateUpdate {
2451 kind: StateUpdateKind::Item(item),
2452 ts,
2453 diff,
2454 });
2455 } else if item.id > temp_item.id {
2456 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2457 state_updates.push(StateUpdate {
2458 kind: StateUpdateKind::TemporaryItem(temp_item),
2459 ts,
2460 diff,
2461 });
2462 } else {
2463 unreachable!(
2464 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2465 );
2466 }
2467 }
2468
2469 while let Some((item, ts, diff)) = item_updates.pop_front() {
2470 state_updates.push(StateUpdate {
2471 kind: StateUpdateKind::Item(item),
2472 ts,
2473 diff,
2474 });
2475 }
2476
2477 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2478 state_updates.push(StateUpdate {
2479 kind: StateUpdateKind::TemporaryItem(temp_item),
2480 ts,
2481 diff,
2482 });
2483 }
2484
2485 state_updates
2486 }
2487 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2488 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2489
2490 iter::empty()
2492 .chain(post_item_retractions.into_iter().rev())
2494 .chain(item_retractions.into_iter().rev())
2495 .chain(other_builtin_retractions.into_iter().rev())
2496 .chain(cluster_retractions.into_iter().rev())
2497 .chain(builtin_source_retractions.into_iter().rev())
2498 .chain(pre_cluster_retractions.into_iter().rev())
2499 .chain(pre_cluster_additions)
2500 .chain(builtin_source_additions)
2501 .chain(cluster_additions)
2502 .chain(other_builtin_additions)
2503 .chain(item_additions)
2504 .chain(post_item_additions)
2505 .collect()
2506}
2507
2508enum ApplyState {
2513 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2515 Items(Vec<StateUpdate>),
2521 Updates(Vec<StateUpdate>),
2523}
2524
2525impl ApplyState {
2526 fn new(update: StateUpdate) -> Self {
2527 use StateUpdateKind::*;
2528 match &update.kind {
2529 SystemObjectMapping(som)
2530 if som.description.object_type == CatalogItemType::View
2531 && update.diff == StateDiff::Addition =>
2532 {
2533 let view_addition = lookup_builtin_view_addition(som.clone());
2534 Self::BuiltinViewAdditions(vec![view_addition])
2535 }
2536
2537 IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2538 Self::Items(vec![update])
2539 }
2540
2541 Role(_)
2542 | RoleAuth(_)
2543 | Database(_)
2544 | Schema(_)
2545 | DefaultPrivilege(_)
2546 | SystemPrivilege(_)
2547 | SystemConfiguration(_)
2548 | Cluster(_)
2549 | NetworkPolicy(_)
2550 | ClusterReplica(_)
2551 | SourceReferences(_)
2552 | Comment(_)
2553 | AuditLog(_)
2554 | StorageCollectionMetadata(_)
2555 | UnfinalizedShard(_) => Self::Updates(vec![update]),
2556 }
2557 }
2558
2559 async fn apply(
2565 self,
2566 state: &mut CatalogState,
2567 retractions: &mut InProgressRetractions,
2568 local_expression_cache: &mut LocalExpressionCache,
2569 ) -> (
2570 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2571 Vec<ParsedStateUpdate>,
2572 ) {
2573 match self {
2574 Self::BuiltinViewAdditions(builtin_view_additions) => {
2575 let restore = Arc::clone(&state.system_configuration);
2576 Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2577 let builtin_table_updates = CatalogState::parse_builtin_views(
2578 state,
2579 builtin_view_additions,
2580 retractions,
2581 local_expression_cache,
2582 )
2583 .await;
2584 state.system_configuration = restore;
2585 (builtin_table_updates, Vec::new())
2586 }
2587 Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2588 state
2589 .apply_updates_inner(updates, retractions, local_expression_cache)
2590 .expect("corrupt catalog")
2591 }),
2592 Self::Updates(updates) => state
2593 .apply_updates_inner(updates, retractions, local_expression_cache)
2594 .expect("corrupt catalog"),
2595 }
2596 }
2597
2598 async fn step(
2599 self,
2600 next: Self,
2601 state: &mut CatalogState,
2602 retractions: &mut InProgressRetractions,
2603 local_expression_cache: &mut LocalExpressionCache,
2604 ) -> (
2605 Self,
2606 (
2607 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2608 Vec<ParsedStateUpdate>,
2609 ),
2610 ) {
2611 match (self, next) {
2612 (
2613 Self::BuiltinViewAdditions(mut builtin_view_additions),
2614 Self::BuiltinViewAdditions(next_builtin_view_additions),
2615 ) => {
2616 builtin_view_additions.extend(next_builtin_view_additions);
2618 (
2619 Self::BuiltinViewAdditions(builtin_view_additions),
2620 (Vec::new(), Vec::new()),
2621 )
2622 }
2623 (Self::Items(mut updates), Self::Items(next_updates)) => {
2624 updates.extend(next_updates);
2626 (Self::Items(updates), (Vec::new(), Vec::new()))
2627 }
2628 (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2629 updates.extend(next_updates);
2631 (Self::Updates(updates), (Vec::new(), Vec::new()))
2632 }
2633 (apply_state, next_apply_state) => {
2634 let updates = apply_state
2636 .apply(state, retractions, local_expression_cache)
2637 .await;
2638 (next_apply_state, updates)
2639 }
2640 }
2641 }
2642}
2643
2644trait MutableMap<K, V> {
2647 fn insert(&mut self, key: K, value: V) -> Option<V>;
2648 fn remove(&mut self, key: &K) -> Option<V>;
2649}
2650
2651impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2652 fn insert(&mut self, key: K, value: V) -> Option<V> {
2653 BTreeMap::insert(self, key, value)
2654 }
2655 fn remove(&mut self, key: &K) -> Option<V> {
2656 BTreeMap::remove(self, key)
2657 }
2658}
2659
2660impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2661 fn insert(&mut self, key: K, value: V) -> Option<V> {
2662 imbl::OrdMap::insert(self, key, value)
2663 }
2664 fn remove(&mut self, key: &K) -> Option<V> {
2665 imbl::OrdMap::remove(self, key)
2666 }
2667}
2668
2669fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2674where
2675 K: Ord + Clone + Debug,
2676 V: PartialEq + Debug,
2677{
2678 match diff {
2679 StateDiff::Retraction => {
2680 let prev = map.remove(key);
2681 assert_eq!(
2682 prev,
2683 Some(value),
2684 "retraction does not match existing value: {key:?}"
2685 );
2686 }
2687 StateDiff::Addition => {
2688 let prev = map.insert(key.clone(), value);
2689 assert_eq!(
2690 prev, None,
2691 "values must be explicitly retracted before inserting a new value: {key:?}"
2692 );
2693 }
2694 }
2695}
2696
2697fn apply_with_update<K, V, D>(
2700 map: &mut impl MutableMap<K, V>,
2701 durable: D,
2702 key_fn: impl FnOnce(&D) -> K,
2703 diff: StateDiff,
2704 retractions: &mut BTreeMap<D::Key, V>,
2705) where
2706 K: Ord,
2707 V: UpdateFrom<D> + PartialEq + Debug,
2708 D: DurableType,
2709 D::Key: Ord,
2710{
2711 match diff {
2712 StateDiff::Retraction => {
2713 let mem_key = key_fn(&durable);
2714 let value = map
2715 .remove(&mem_key)
2716 .expect("retraction does not match existing value: {key:?}");
2717 let durable_key = durable.into_key_value().0;
2718 retractions.insert(durable_key, value);
2719 }
2720 StateDiff::Addition => {
2721 let mem_key = key_fn(&durable);
2722 let durable_key = durable.key();
2723 let value = match retractions.remove(&durable_key) {
2724 Some(mut retraction) => {
2725 retraction.update_from(durable);
2726 retraction
2727 }
2728 None => durable.into(),
2729 };
2730 let prev = map.insert(mem_key, value);
2731 assert_eq!(
2732 prev, None,
2733 "values must be explicitly retracted before inserting a new value"
2734 );
2735 }
2736 }
2737}
2738
2739fn lookup_builtin_view_addition(
2741 mapping: SystemObjectMapping,
2742) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2743 let (_, builtin) = BUILTIN_LOOKUP
2744 .get(&mapping.description)
2745 .expect("missing builtin view");
2746 let Builtin::View(view) = builtin else {
2747 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2748 };
2749
2750 (
2751 view,
2752 mapping.unique_identifier.catalog_id,
2753 mapping.unique_identifier.global_id,
2754 )
2755}