1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::iter;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use differential_dataflow::consolidation::consolidate_updates;
20use futures::future;
21use itertools::{Either, Itertools};
22use mz_adapter_types::connection::ConnectionId;
23use mz_catalog::SYSTEM_CONN_ID;
24use mz_catalog::builtin::{
25 BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP, Builtin, BuiltinLog, BuiltinTable, BuiltinView,
26};
27use mz_catalog::durable::objects::{
28 ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleAuthKey, RoleKey,
29 SchemaKey,
30};
31use mz_catalog::durable::{CatalogError, SystemObjectMapping};
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, Func, Index, Log, NetworkPolicy,
35 Role, RoleAuth, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table,
36 TableDataSource, TemporaryItem, Type, UpdateFrom,
37};
38use mz_compute_types::config::ComputeReplicaConfig;
39use mz_compute_types::dataflows::DataflowDescription;
40use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_ore::collections::CollectionExt;
44use mz_ore::tracing::OpenTelemetryContext;
45use mz_ore::{instrument, soft_assert_eq_or_log, soft_assert_no_log, soft_assert_or_log};
46use mz_pgrepr::oid::INVALID_OID;
47use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
48use mz_repr::role_id::RoleId;
49use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
50use mz_sql::catalog::CatalogError as SqlCatalogError;
51use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
52use mz_sql::names::{
53 FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
54 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
55};
56use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
57use mz_sql::session::vars::{VarError, VarInput};
58use mz_sql::{plan, rbac};
59use mz_sql_parser::ast::Expr;
60use mz_storage_types::sources::Timeline;
61use mz_transform::dataflow::DataflowMetainfo;
62use mz_transform::notice::OptimizerNotice;
63use tracing::{info_span, warn};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{BuiltinTableUpdate, CatalogState};
68use crate::coord::catalog_implications::parsed_state_updates::{self, ParsedStateUpdate};
69use crate::util::{index_sql, sort_topological};
70
71#[derive(Debug, Clone, Default)]
83struct InProgressRetractions {
84 roles: BTreeMap<RoleKey, Role>,
85 role_auths: BTreeMap<RoleAuthKey, RoleAuth>,
86 databases: BTreeMap<DatabaseKey, Database>,
87 schemas: BTreeMap<SchemaKey, Schema>,
88 clusters: BTreeMap<ClusterKey, Cluster>,
89 network_policies: BTreeMap<NetworkPolicyKey, NetworkPolicy>,
90 items: BTreeMap<ItemKey, CatalogEntry>,
91 temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
92 introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
93 system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
94}
95
96impl CatalogState {
97 #[must_use]
101 #[instrument]
102 pub(crate) async fn apply_updates(
103 &mut self,
104 updates: Vec<StateUpdate>,
105 local_expression_cache: &mut LocalExpressionCache,
106 ) -> (
107 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
108 Vec<ParsedStateUpdate>,
109 ) {
110 let mut builtin_table_updates = Vec::with_capacity(updates.len());
111 let mut catalog_updates = Vec::with_capacity(updates.len());
112
113 let updates = Self::consolidate_updates(updates);
118
119 let mut groups: Vec<Vec<_>> = Vec::new();
121 for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
122 let updates = sort_updates(updates.collect());
126 groups.push(updates);
127 }
128
129 for updates in groups {
130 let mut apply_state = ApplyState::Updates(Vec::new());
131 let mut retractions = InProgressRetractions::default();
132
133 for update in updates {
134 let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
135 .step(
136 ApplyState::new(update),
137 self,
138 &mut retractions,
139 local_expression_cache,
140 )
141 .await;
142 apply_state = next_apply_state;
143 builtin_table_updates.extend(builtin_table_update);
144 catalog_updates.extend(catalog_update);
145 }
146
147 let (builtin_table_update, catalog_update) = apply_state
149 .apply(self, &mut retractions, local_expression_cache)
150 .await;
151 builtin_table_updates.extend(builtin_table_update);
152 catalog_updates.extend(catalog_update);
153
154 let dropped_entries: Vec<CatalogEntry> = retractions
157 .items
158 .into_values()
159 .chain(retractions.temp_items.into_values())
160 .collect();
161 if !dropped_entries.is_empty() {
162 let dropped_notices = self.drop_optimizer_notices(dropped_entries);
163 if self.system_config().enable_mz_notices() {
164 self.pack_optimizer_notice_updates(
165 &mut builtin_table_updates,
166 dropped_notices.iter(),
167 Diff::MINUS_ONE,
168 );
169 }
170 }
171 }
172
173 (builtin_table_updates, catalog_updates)
174 }
175
176 fn consolidate_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
184 let mut updates: Vec<(StateUpdateKind, Timestamp, mz_repr::Diff)> = updates
185 .into_iter()
186 .map(|update| (update.kind, update.ts, update.diff.into()))
187 .collect_vec();
188
189 consolidate_updates(&mut updates);
190
191 updates
192 .into_iter()
193 .map(|(kind, ts, diff)| StateUpdate {
194 kind,
195 ts,
196 diff: diff
197 .try_into()
198 .expect("catalog state cannot have diff other than -1 or 1"),
199 })
200 .collect_vec()
201 }
202
203 #[instrument(level = "debug")]
204 fn apply_updates_inner(
205 &mut self,
206 updates: Vec<StateUpdate>,
207 retractions: &mut InProgressRetractions,
208 local_expression_cache: &mut LocalExpressionCache,
209 ) -> Result<
210 (
211 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
212 Vec<ParsedStateUpdate>,
213 ),
214 CatalogError,
215 > {
216 soft_assert_no_log!(
217 updates.iter().map(|update| update.ts).all_equal(),
218 "all timestamps should be equal: {updates:?}"
219 );
220
221 let mut update_system_config = false;
222
223 let mut builtin_table_updates = Vec::with_capacity(updates.len());
224 let mut catalog_updates = Vec::new();
225
226 for state_update in updates {
227 if matches!(state_update.kind, StateUpdateKind::SystemConfiguration(_)) {
228 update_system_config = true;
229 }
230
231 match state_update.diff {
232 StateDiff::Retraction => {
233 if let Some(update) =
237 parsed_state_updates::parse_state_update(self, state_update.clone())
238 {
239 catalog_updates.push(update);
240 }
241
242 builtin_table_updates.extend(self.generate_builtin_table_update(
245 state_update.kind.clone(),
246 state_update.diff,
247 ));
248 self.apply_update(
249 state_update.kind,
250 state_update.diff,
251 retractions,
252 local_expression_cache,
253 )?;
254 }
255 StateDiff::Addition => {
256 self.apply_update(
257 state_update.kind.clone(),
258 state_update.diff,
259 retractions,
260 local_expression_cache,
261 )?;
262 builtin_table_updates.extend(self.generate_builtin_table_update(
266 state_update.kind.clone(),
267 state_update.diff,
268 ));
269
270 if let Some(update) =
273 parsed_state_updates::parse_state_update(self, state_update.clone())
274 {
275 catalog_updates.push(update);
276 }
277 }
278 }
279 }
280
281 if update_system_config {
282 self.system_configuration.dyncfg_updates();
283 }
284
285 Ok((builtin_table_updates, catalog_updates))
286 }
287
288 #[instrument(level = "debug")]
289 fn apply_update(
290 &mut self,
291 kind: StateUpdateKind,
292 diff: StateDiff,
293 retractions: &mut InProgressRetractions,
294 local_expression_cache: &mut LocalExpressionCache,
295 ) -> Result<(), CatalogError> {
296 match kind {
297 StateUpdateKind::Role(role) => {
298 self.apply_role_update(role, diff, retractions);
299 }
300 StateUpdateKind::RoleAuth(role_auth) => {
301 self.apply_role_auth_update(role_auth, diff, retractions);
302 }
303 StateUpdateKind::Database(database) => {
304 self.apply_database_update(database, diff, retractions);
305 }
306 StateUpdateKind::Schema(schema) => {
307 self.apply_schema_update(schema, diff, retractions);
308 }
309 StateUpdateKind::DefaultPrivilege(default_privilege) => {
310 self.apply_default_privilege_update(default_privilege, diff, retractions);
311 }
312 StateUpdateKind::SystemPrivilege(system_privilege) => {
313 self.apply_system_privilege_update(system_privilege, diff, retractions);
314 }
315 StateUpdateKind::SystemConfiguration(system_configuration) => {
316 self.apply_system_configuration_update(system_configuration, diff, retractions);
317 }
318 StateUpdateKind::Cluster(cluster) => {
319 self.apply_cluster_update(cluster, diff, retractions);
320 }
321 StateUpdateKind::NetworkPolicy(network_policy) => {
322 self.apply_network_policy_update(network_policy, diff, retractions);
323 }
324 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
325 self.apply_introspection_source_index_update(
326 introspection_source_index,
327 diff,
328 retractions,
329 );
330 }
331 StateUpdateKind::ClusterReplica(cluster_replica) => {
332 self.apply_cluster_replica_update(cluster_replica, diff, retractions);
333 }
334 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
335 self.apply_system_object_mapping_update(
336 system_object_mapping,
337 diff,
338 retractions,
339 local_expression_cache,
340 );
341 }
342 StateUpdateKind::TemporaryItem(item) => {
343 self.apply_temporary_item_update(item, diff, retractions, local_expression_cache);
344 }
345 StateUpdateKind::Item(item) => {
346 self.apply_item_update(item, diff, retractions, local_expression_cache)?;
347 }
348 StateUpdateKind::Comment(comment) => {
349 self.apply_comment_update(comment, diff, retractions);
350 }
351 StateUpdateKind::SourceReferences(source_reference) => {
352 self.apply_source_references_update(source_reference, diff, retractions);
353 }
354 StateUpdateKind::AuditLog(_audit_log) => {
355 }
357 StateUpdateKind::StorageCollectionMetadata(storage_collection_metadata) => {
358 self.apply_storage_collection_metadata_update(
359 storage_collection_metadata,
360 diff,
361 retractions,
362 );
363 }
364 StateUpdateKind::UnfinalizedShard(unfinalized_shard) => {
365 self.apply_unfinalized_shard_update(unfinalized_shard, diff, retractions);
366 }
367 }
368
369 Ok(())
370 }
371
372 #[instrument(level = "debug")]
373 fn apply_role_auth_update(
374 &mut self,
375 role_auth: mz_catalog::durable::RoleAuth,
376 diff: StateDiff,
377 retractions: &mut InProgressRetractions,
378 ) {
379 apply_with_update(
380 &mut self.role_auth_by_id,
381 role_auth,
382 |role_auth| role_auth.role_id,
383 diff,
384 &mut retractions.role_auths,
385 );
386 }
387
388 #[instrument(level = "debug")]
389 fn apply_role_update(
390 &mut self,
391 role: mz_catalog::durable::Role,
392 diff: StateDiff,
393 retractions: &mut InProgressRetractions,
394 ) {
395 apply_inverted_lookup(&mut self.roles_by_name, &role.name, role.id, diff);
396 apply_with_update(
397 &mut self.roles_by_id,
398 role,
399 |role| role.id,
400 diff,
401 &mut retractions.roles,
402 );
403 }
404
405 #[instrument(level = "debug")]
406 fn apply_database_update(
407 &mut self,
408 database: mz_catalog::durable::Database,
409 diff: StateDiff,
410 retractions: &mut InProgressRetractions,
411 ) {
412 apply_inverted_lookup(
413 &mut self.database_by_name,
414 &database.name,
415 database.id,
416 diff,
417 );
418 apply_with_update(
419 &mut self.database_by_id,
420 database,
421 |database| database.id,
422 diff,
423 &mut retractions.databases,
424 );
425 }
426
427 #[instrument(level = "debug")]
428 fn apply_schema_update(
429 &mut self,
430 schema: mz_catalog::durable::Schema,
431 diff: StateDiff,
432 retractions: &mut InProgressRetractions,
433 ) {
434 match &schema.database_id {
435 Some(database_id) => {
436 let db = self
437 .database_by_id
438 .get_mut(database_id)
439 .expect("catalog out of sync");
440 apply_inverted_lookup(&mut db.schemas_by_name, &schema.name, schema.id, diff);
441 apply_with_update(
442 &mut db.schemas_by_id,
443 schema,
444 |schema| schema.id,
445 diff,
446 &mut retractions.schemas,
447 );
448 }
449 None => {
450 apply_inverted_lookup(
451 &mut self.ambient_schemas_by_name,
452 &schema.name,
453 schema.id,
454 diff,
455 );
456 apply_with_update(
457 &mut self.ambient_schemas_by_id,
458 schema,
459 |schema| schema.id,
460 diff,
461 &mut retractions.schemas,
462 );
463 }
464 }
465 }
466
467 #[instrument(level = "debug")]
468 fn apply_default_privilege_update(
469 &mut self,
470 default_privilege: mz_catalog::durable::DefaultPrivilege,
471 diff: StateDiff,
472 _retractions: &mut InProgressRetractions,
473 ) {
474 match diff {
475 StateDiff::Addition => Arc::make_mut(&mut self.default_privileges)
476 .grant(default_privilege.object, default_privilege.acl_item),
477 StateDiff::Retraction => Arc::make_mut(&mut self.default_privileges)
478 .revoke(&default_privilege.object, &default_privilege.acl_item),
479 }
480 }
481
482 #[instrument(level = "debug")]
483 fn apply_system_privilege_update(
484 &mut self,
485 system_privilege: MzAclItem,
486 diff: StateDiff,
487 _retractions: &mut InProgressRetractions,
488 ) {
489 match diff {
490 StateDiff::Addition => {
491 Arc::make_mut(&mut self.system_privileges).grant(system_privilege)
492 }
493 StateDiff::Retraction => {
494 Arc::make_mut(&mut self.system_privileges).revoke(&system_privilege)
495 }
496 }
497 }
498
499 #[instrument(level = "debug")]
500 fn apply_system_configuration_update(
501 &mut self,
502 system_configuration: mz_catalog::durable::SystemConfiguration,
503 diff: StateDiff,
504 _retractions: &mut InProgressRetractions,
505 ) {
506 let res = match diff {
507 StateDiff::Addition => self.insert_system_configuration(
508 &system_configuration.name,
509 VarInput::Flat(&system_configuration.value),
510 ),
511 StateDiff::Retraction => self.remove_system_configuration(&system_configuration.name),
512 };
513 match res {
514 Ok(_) => (),
515 Err(Error {
519 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
520 }) => {
521 warn!(%name, "unknown system parameter from catalog storage");
522 }
523 Err(e) => panic!("unable to update system variable: {e:?}"),
524 }
525 }
526
527 #[instrument(level = "debug")]
528 fn apply_cluster_update(
529 &mut self,
530 cluster: mz_catalog::durable::Cluster,
531 diff: StateDiff,
532 retractions: &mut InProgressRetractions,
533 ) {
534 apply_inverted_lookup(&mut self.clusters_by_name, &cluster.name, cluster.id, diff);
535 apply_with_update(
536 &mut self.clusters_by_id,
537 cluster,
538 |cluster| cluster.id,
539 diff,
540 &mut retractions.clusters,
541 );
542 }
543
544 #[instrument(level = "debug")]
545 fn apply_network_policy_update(
546 &mut self,
547 policy: mz_catalog::durable::NetworkPolicy,
548 diff: StateDiff,
549 retractions: &mut InProgressRetractions,
550 ) {
551 apply_inverted_lookup(
552 &mut self.network_policies_by_name,
553 &policy.name,
554 policy.id,
555 diff,
556 );
557 apply_with_update(
558 &mut self.network_policies_by_id,
559 policy,
560 |policy| policy.id,
561 diff,
562 &mut retractions.network_policies,
563 );
564 }
565
566 #[instrument(level = "debug")]
567 fn apply_introspection_source_index_update(
568 &mut self,
569 introspection_source_index: mz_catalog::durable::IntrospectionSourceIndex,
570 diff: StateDiff,
571 retractions: &mut InProgressRetractions,
572 ) {
573 let cluster = self
574 .clusters_by_id
575 .get_mut(&introspection_source_index.cluster_id)
576 .expect("catalog out of sync");
577 let log = BUILTIN_LOG_LOOKUP
578 .get(introspection_source_index.name.as_str())
579 .expect("missing log");
580 apply_inverted_lookup(
581 &mut cluster.log_indexes,
582 &log.variant,
583 introspection_source_index.index_id,
584 diff,
585 );
586
587 match diff {
588 StateDiff::Addition => {
589 if let Some(mut entry) = retractions
590 .introspection_source_indexes
591 .remove(&introspection_source_index.item_id)
592 {
593 let (index_name, index) = self.create_introspection_source_index(
596 introspection_source_index.cluster_id,
597 log,
598 introspection_source_index.index_id,
599 );
600 assert_eq!(entry.id, introspection_source_index.item_id);
601 assert_eq!(entry.oid, introspection_source_index.oid);
602 assert_eq!(entry.name, index_name);
603 entry.item = index;
604 self.insert_entry(entry);
605 } else {
606 self.insert_introspection_source_index(
607 introspection_source_index.cluster_id,
608 log,
609 introspection_source_index.item_id,
610 introspection_source_index.index_id,
611 introspection_source_index.oid,
612 );
613 }
614 }
615 StateDiff::Retraction => {
616 let entry = self.drop_item(introspection_source_index.item_id);
617 retractions
618 .introspection_source_indexes
619 .insert(entry.id, entry);
620 }
621 }
622 }
623
624 #[instrument(level = "debug")]
625 fn apply_cluster_replica_update(
626 &mut self,
627 cluster_replica: mz_catalog::durable::ClusterReplica,
628 diff: StateDiff,
629 _retractions: &mut InProgressRetractions,
630 ) {
631 let cluster = self
632 .clusters_by_id
633 .get(&cluster_replica.cluster_id)
634 .expect("catalog out of sync");
635 let azs = cluster.availability_zones();
636 let location = self
637 .concretize_replica_location(cluster_replica.config.location, &vec![], azs)
638 .expect("catalog in unexpected state");
639 let cluster = self
640 .clusters_by_id
641 .get_mut(&cluster_replica.cluster_id)
642 .expect("catalog out of sync");
643 apply_inverted_lookup(
644 &mut cluster.replica_id_by_name_,
645 &cluster_replica.name,
646 cluster_replica.replica_id,
647 diff,
648 );
649 match diff {
650 StateDiff::Retraction => {
651 let prev = cluster.replicas_by_id_.remove(&cluster_replica.replica_id);
652 assert!(
653 prev.is_some(),
654 "retraction does not match existing value: {:?}",
655 cluster_replica.replica_id
656 );
657 }
658 StateDiff::Addition => {
659 let logging = ReplicaLogging {
660 log_logging: cluster_replica.config.logging.log_logging,
661 interval: cluster_replica.config.logging.interval,
662 };
663 let config = ReplicaConfig {
664 location,
665 compute: ComputeReplicaConfig { logging },
666 };
667 let mem_cluster_replica = ClusterReplica {
668 name: cluster_replica.name.clone(),
669 cluster_id: cluster_replica.cluster_id,
670 replica_id: cluster_replica.replica_id,
671 config,
672 owner_id: cluster_replica.owner_id,
673 };
674 let prev = cluster
675 .replicas_by_id_
676 .insert(cluster_replica.replica_id, mem_cluster_replica);
677 assert_eq!(
678 prev, None,
679 "values must be explicitly retracted before inserting a new value: {:?}",
680 cluster_replica.replica_id
681 );
682 }
683 }
684 }
685
686 #[instrument(level = "debug")]
687 fn apply_system_object_mapping_update(
688 &mut self,
689 system_object_mapping: mz_catalog::durable::SystemObjectMapping,
690 diff: StateDiff,
691 retractions: &mut InProgressRetractions,
692 local_expression_cache: &mut LocalExpressionCache,
693 ) {
694 let item_id = system_object_mapping.unique_identifier.catalog_id;
695 let global_id = system_object_mapping.unique_identifier.global_id;
696
697 if system_object_mapping.unique_identifier.runtime_alterable() {
698 return;
702 }
703
704 if let StateDiff::Retraction = diff {
705 let entry = self.drop_item(item_id);
706 retractions.system_object_mappings.insert(item_id, entry);
707 return;
708 }
709
710 if let Some(entry) = retractions.system_object_mappings.remove(&item_id) {
711 self.insert_entry(entry);
716 return;
717 }
718
719 let builtin = BUILTIN_LOOKUP
720 .get(&system_object_mapping.description)
721 .expect("missing builtin")
722 .1;
723 let schema_name = builtin.schema();
724 let schema_id = self
725 .ambient_schemas_by_name
726 .get(schema_name)
727 .unwrap_or_else(|| panic!("unknown ambient schema: {schema_name}"));
728 let name = QualifiedItemName {
729 qualifiers: ItemQualifiers {
730 database_spec: ResolvedDatabaseSpecifier::Ambient,
731 schema_spec: SchemaSpecifier::Id(*schema_id),
732 },
733 item: builtin.name().into(),
734 };
735 match builtin {
736 Builtin::Log(log) => {
737 let mut acl_items = vec![rbac::owner_privilege(
738 mz_sql::catalog::ObjectType::Source,
739 MZ_SYSTEM_ROLE_ID,
740 )];
741 acl_items.extend_from_slice(&log.access);
742 self.insert_item(
743 item_id,
744 log.oid,
745 name.clone(),
746 CatalogItem::Log(Log {
747 variant: log.variant,
748 global_id,
749 }),
750 MZ_SYSTEM_ROLE_ID,
751 PrivilegeMap::from_mz_acl_items(acl_items),
752 );
753 }
754
755 Builtin::Table(table) => {
756 let mut acl_items = vec![rbac::owner_privilege(
757 mz_sql::catalog::ObjectType::Table,
758 MZ_SYSTEM_ROLE_ID,
759 )];
760 acl_items.extend_from_slice(&table.access);
761
762 self.insert_item(
763 item_id,
764 table.oid,
765 name.clone(),
766 CatalogItem::Table(Table {
767 create_sql: None,
768 desc: VersionedRelationDesc::new(table.desc.clone()),
769 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
770 conn_id: None,
771 resolved_ids: ResolvedIds::empty(),
772 custom_logical_compaction_window: table.is_retained_metrics_object.then(
773 || {
774 self.system_config()
775 .metrics_retention()
776 .try_into()
777 .expect("invalid metrics retention")
778 },
779 ),
780 is_retained_metrics_object: table.is_retained_metrics_object,
781 data_source: TableDataSource::TableWrites {
782 defaults: vec![Expr::null(); table.desc.arity()],
783 },
784 }),
785 MZ_SYSTEM_ROLE_ID,
786 PrivilegeMap::from_mz_acl_items(acl_items),
787 );
788 }
789 Builtin::Index(index) => {
790 let custom_logical_compaction_window =
791 index.is_retained_metrics_object.then(|| {
792 self.system_config()
793 .metrics_retention()
794 .try_into()
795 .expect("invalid metrics retention")
796 });
797 let versions = BTreeMap::new();
799
800 let item = self
801 .parse_item(
802 global_id,
803 &index.create_sql(),
804 &versions,
805 None,
806 index.is_retained_metrics_object,
807 custom_logical_compaction_window,
808 local_expression_cache,
809 None,
810 )
811 .unwrap_or_else(|e| {
812 panic!(
813 "internal error: failed to load bootstrap index:\n\
814 {}\n\
815 error:\n\
816 {:?}\n\n\
817 make sure that the schema name is specified in the builtin index's create sql statement.",
818 index.name, e
819 )
820 });
821 let CatalogItem::Index(_) = item else {
822 panic!(
823 "internal error: builtin index {}'s SQL does not begin with \"CREATE INDEX\".",
824 index.name
825 );
826 };
827
828 self.insert_item(
829 item_id,
830 index.oid,
831 name,
832 item,
833 MZ_SYSTEM_ROLE_ID,
834 PrivilegeMap::default(),
835 );
836 }
837 Builtin::View(_) => {
838 unreachable!("views added elsewhere");
840 }
841
842 Builtin::Type(typ) => {
844 let typ = self.resolve_builtin_type_references(typ);
845 if let CatalogType::Array { element_reference } = typ.details.typ {
846 let entry = self.get_entry_mut(&element_reference);
847 let item_type = match &mut entry.item {
848 CatalogItem::Type(item_type) => item_type,
849 _ => unreachable!("types can only reference other types"),
850 };
851 item_type.details.array_id = Some(item_id);
852 }
853
854 let schema_id = self.resolve_system_schema(typ.schema);
855
856 self.insert_item(
857 item_id,
858 typ.oid,
859 QualifiedItemName {
860 qualifiers: ItemQualifiers {
861 database_spec: ResolvedDatabaseSpecifier::Ambient,
862 schema_spec: SchemaSpecifier::Id(schema_id),
863 },
864 item: typ.name.to_owned(),
865 },
866 CatalogItem::Type(Type {
867 create_sql: None,
868 global_id,
869 details: typ.details.clone(),
870 resolved_ids: ResolvedIds::empty(),
871 }),
872 MZ_SYSTEM_ROLE_ID,
873 PrivilegeMap::from_mz_acl_items(vec![
874 rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Type),
875 rbac::owner_privilege(mz_sql::catalog::ObjectType::Type, MZ_SYSTEM_ROLE_ID),
876 ]),
877 );
878 }
879
880 Builtin::Func(func) => {
881 let oid = INVALID_OID;
885 self.insert_item(
886 item_id,
887 oid,
888 name.clone(),
889 CatalogItem::Func(Func {
890 inner: func.inner,
891 global_id,
892 }),
893 MZ_SYSTEM_ROLE_ID,
894 PrivilegeMap::default(),
895 );
896 }
897
898 Builtin::Source(coll) => {
899 let mut acl_items = vec![rbac::owner_privilege(
900 mz_sql::catalog::ObjectType::Source,
901 MZ_SYSTEM_ROLE_ID,
902 )];
903 acl_items.extend_from_slice(&coll.access);
904
905 self.insert_item(
906 item_id,
907 coll.oid,
908 name.clone(),
909 CatalogItem::Source(Source {
910 create_sql: None,
911 data_source: coll.data_source.clone(),
912 desc: coll.desc.clone(),
913 global_id,
914 timeline: Timeline::EpochMilliseconds,
915 resolved_ids: ResolvedIds::empty(),
916 custom_logical_compaction_window: coll.is_retained_metrics_object.then(
917 || {
918 self.system_config()
919 .metrics_retention()
920 .try_into()
921 .expect("invalid metrics retention")
922 },
923 ),
924 is_retained_metrics_object: coll.is_retained_metrics_object,
925 }),
926 MZ_SYSTEM_ROLE_ID,
927 PrivilegeMap::from_mz_acl_items(acl_items),
928 );
929 }
930 Builtin::MaterializedView(mv) => {
931 let mut acl_items = vec![rbac::owner_privilege(
932 mz_sql::catalog::ObjectType::MaterializedView,
933 MZ_SYSTEM_ROLE_ID,
934 )];
935 acl_items.extend_from_slice(&mv.access);
936
937 let custom_logical_compaction_window = mv.is_retained_metrics_object.then(|| {
938 self.system_config()
939 .metrics_retention()
940 .try_into()
941 .expect("invalid metrics retention")
942 });
943
944 let versions = BTreeMap::new();
946
947 let mut item = self
948 .parse_item(
949 global_id,
950 &mv.create_sql(),
951 &versions,
952 None,
953 mv.is_retained_metrics_object,
954 custom_logical_compaction_window,
955 local_expression_cache,
956 None,
957 )
958 .unwrap_or_else(|e| {
959 panic!(
960 "internal error: failed to load bootstrap materialized view:\n\
961 {}\n\
962 error:\n\
963 {e:?}\n\n\
964 make sure that the schema name is specified in the builtin \
965 materialized view's create sql statement.",
966 mv.name,
967 )
968 });
969 let CatalogItem::MaterializedView(catalog_mv) = &mut item else {
970 panic!(
971 "internal error: builtin materialized view {}'s SQL does not begin \
972 with \"CREATE MATERIALIZED VIEW\".",
973 mv.name,
974 );
975 };
976
977 let mut desc = catalog_mv.desc.latest();
981 for key in &mv.desc.typ().keys {
982 desc = desc.with_key(key.clone());
983 }
984 catalog_mv.desc = VersionedRelationDesc::new(desc);
985
986 self.insert_item(
987 item_id,
988 mv.oid,
989 name,
990 item,
991 MZ_SYSTEM_ROLE_ID,
992 PrivilegeMap::from_mz_acl_items(acl_items),
993 );
994 }
995 Builtin::ContinualTask(ct) => {
996 let mut acl_items = vec![rbac::owner_privilege(
997 mz_sql::catalog::ObjectType::Source,
998 MZ_SYSTEM_ROLE_ID,
999 )];
1000 acl_items.extend_from_slice(&ct.access);
1001 let versions = BTreeMap::new();
1003
1004 let item = self
1005 .parse_item(
1006 global_id,
1007 &ct.create_sql(),
1008 &versions,
1009 None,
1010 false,
1011 None,
1012 local_expression_cache,
1013 None,
1014 )
1015 .unwrap_or_else(|e| {
1016 panic!(
1017 "internal error: failed to load bootstrap continual task:\n\
1018 {}\n\
1019 error:\n\
1020 {:?}\n\n\
1021 make sure that the schema name is specified in the builtin continual task's create sql statement.",
1022 ct.name, e
1023 )
1024 });
1025 let CatalogItem::ContinualTask(_) = &item else {
1026 panic!(
1027 "internal error: builtin continual task {}'s SQL does not begin with \"CREATE CONTINUAL TASK\".",
1028 ct.name
1029 );
1030 };
1031
1032 self.insert_item(
1033 item_id,
1034 ct.oid,
1035 name,
1036 item,
1037 MZ_SYSTEM_ROLE_ID,
1038 PrivilegeMap::from_mz_acl_items(acl_items),
1039 );
1040 }
1041 Builtin::Connection(connection) => {
1042 let versions = BTreeMap::new();
1044 let mut item = self
1045 .parse_item(
1046 global_id,
1047 connection.sql,
1048 &versions,
1049 None,
1050 false,
1051 None,
1052 local_expression_cache,
1053 None,
1054 )
1055 .unwrap_or_else(|e| {
1056 panic!(
1057 "internal error: failed to load bootstrap connection:\n\
1058 {}\n\
1059 error:\n\
1060 {:?}\n\n\
1061 make sure that the schema name is specified in the builtin connection's create sql statement.",
1062 connection.name, e
1063 )
1064 });
1065 let CatalogItem::Connection(_) = &mut item else {
1066 panic!(
1067 "internal error: builtin connection {}'s SQL does not begin with \"CREATE CONNECTION\".",
1068 connection.name
1069 );
1070 };
1071
1072 let mut acl_items = vec![rbac::owner_privilege(
1073 mz_sql::catalog::ObjectType::Connection,
1074 connection.owner_id.clone(),
1075 )];
1076 acl_items.extend_from_slice(connection.access);
1077
1078 self.insert_item(
1079 item_id,
1080 connection.oid,
1081 name.clone(),
1082 item,
1083 connection.owner_id.clone(),
1084 PrivilegeMap::from_mz_acl_items(acl_items),
1085 );
1086 }
1087 }
1088 }
1089
1090 #[instrument(level = "debug")]
1091 fn apply_temporary_item_update(
1092 &mut self,
1093 temporary_item: TemporaryItem,
1094 diff: StateDiff,
1095 retractions: &mut InProgressRetractions,
1096 local_expression_cache: &mut LocalExpressionCache,
1097 ) {
1098 match diff {
1099 StateDiff::Addition => {
1100 let TemporaryItem {
1101 id,
1102 oid,
1103 global_id,
1104 schema_id,
1105 name,
1106 conn_id,
1107 create_sql,
1108 owner_id,
1109 privileges,
1110 extra_versions,
1111 } = temporary_item;
1112 let temp_conn_id = conn_id
1115 .as_ref()
1116 .expect("temporary items must have a connection id");
1117 if !self.temporary_schemas.contains_key(temp_conn_id) {
1118 self.create_temporary_schema(temp_conn_id, owner_id)
1119 .expect("failed to create temporary schema");
1120 }
1121 let schema = self.find_temp_schema(&schema_id);
1122 let name = QualifiedItemName {
1123 qualifiers: ItemQualifiers {
1124 database_spec: schema.database().clone(),
1125 schema_spec: schema.id().clone(),
1126 },
1127 item: name.clone(),
1128 };
1129
1130 let entry = match retractions.temp_items.remove(&id) {
1131 Some(mut retraction) => {
1132 assert_eq!(retraction.id, id);
1133
1134 if retraction.create_sql() != create_sql {
1139 let mut catalog_item = self
1140 .deserialize_item(
1141 global_id,
1142 &create_sql,
1143 &extra_versions,
1144 local_expression_cache,
1145 Some(retraction.item),
1146 )
1147 .unwrap_or_else(|e| {
1148 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1149 });
1150 catalog_item.set_conn_id(conn_id);
1157 retraction.item = catalog_item;
1158 }
1159
1160 retraction.id = id;
1161 retraction.oid = oid;
1162 retraction.name = name;
1163 retraction.owner_id = owner_id;
1164 retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1165 retraction
1166 }
1167 None => {
1168 let mut catalog_item = self
1169 .deserialize_item(
1170 global_id,
1171 &create_sql,
1172 &extra_versions,
1173 local_expression_cache,
1174 None,
1175 )
1176 .unwrap_or_else(|e| {
1177 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1178 });
1179
1180 catalog_item.set_conn_id(conn_id);
1186
1187 CatalogEntry {
1188 item: catalog_item,
1189 referenced_by: Vec::new(),
1190 used_by: Vec::new(),
1191 id,
1192 oid,
1193 name,
1194 owner_id,
1195 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1196 }
1197 }
1198 };
1199 self.insert_entry(entry);
1200 }
1201 StateDiff::Retraction => {
1202 let entry = self.drop_item(temporary_item.id);
1203 retractions.temp_items.insert(temporary_item.id, entry);
1204 }
1205 }
1206 }
1207
1208 #[instrument(level = "debug")]
1209 fn apply_item_update(
1210 &mut self,
1211 item: mz_catalog::durable::Item,
1212 diff: StateDiff,
1213 retractions: &mut InProgressRetractions,
1214 local_expression_cache: &mut LocalExpressionCache,
1215 ) -> Result<(), CatalogError> {
1216 match diff {
1217 StateDiff::Addition => {
1218 let key = item.key();
1219 let mz_catalog::durable::Item {
1220 id,
1221 oid,
1222 global_id,
1223 schema_id,
1224 name,
1225 create_sql,
1226 owner_id,
1227 privileges,
1228 extra_versions,
1229 } = item;
1230 let schema = self.find_non_temp_schema(&schema_id);
1231 let name = QualifiedItemName {
1232 qualifiers: ItemQualifiers {
1233 database_spec: schema.database().clone(),
1234 schema_spec: schema.id().clone(),
1235 },
1236 item: name.clone(),
1237 };
1238 let entry = match retractions.items.remove(&key) {
1239 Some(retraction) => {
1240 assert_eq!(retraction.id, item.id);
1241
1242 let item = self
1243 .deserialize_item(
1244 global_id,
1245 &create_sql,
1246 &extra_versions,
1247 local_expression_cache,
1248 Some(retraction.item),
1249 )
1250 .unwrap_or_else(|e| {
1251 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1252 });
1253
1254 CatalogEntry {
1255 item,
1256 id,
1257 oid,
1258 name,
1259 owner_id,
1260 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1261 referenced_by: retraction.referenced_by,
1262 used_by: retraction.used_by,
1263 }
1264 }
1265 None => {
1266 let catalog_item = self
1267 .deserialize_item(
1268 global_id,
1269 &create_sql,
1270 &extra_versions,
1271 local_expression_cache,
1272 None,
1273 )
1274 .unwrap_or_else(|e| {
1275 panic!("{e:?}: invalid persisted SQL: {create_sql}")
1276 });
1277 CatalogEntry {
1278 item: catalog_item,
1279 referenced_by: Vec::new(),
1280 used_by: Vec::new(),
1281 id,
1282 oid,
1283 name,
1284 owner_id,
1285 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1286 }
1287 }
1288 };
1289
1290 self.insert_entry(entry);
1291 }
1292 StateDiff::Retraction => {
1293 let entry = self.drop_item(item.id);
1294 let key = item.into_key_value().0;
1295 retractions.items.insert(key, entry);
1296 }
1297 }
1298 Ok(())
1299 }
1300
1301 #[instrument(level = "debug")]
1302 fn apply_comment_update(
1303 &mut self,
1304 comment: mz_catalog::durable::Comment,
1305 diff: StateDiff,
1306 _retractions: &mut InProgressRetractions,
1307 ) {
1308 match diff {
1309 StateDiff::Addition => {
1310 let prev = Arc::make_mut(&mut self.comments).update_comment(
1311 comment.object_id,
1312 comment.sub_component,
1313 Some(comment.comment),
1314 );
1315 assert_eq!(
1316 prev, None,
1317 "values must be explicitly retracted before inserting a new value"
1318 );
1319 }
1320 StateDiff::Retraction => {
1321 let prev = Arc::make_mut(&mut self.comments).update_comment(
1322 comment.object_id,
1323 comment.sub_component,
1324 None,
1325 );
1326 assert_eq!(
1327 prev,
1328 Some(comment.comment),
1329 "retraction does not match existing value: ({:?}, {:?})",
1330 comment.object_id,
1331 comment.sub_component,
1332 );
1333 }
1334 }
1335 }
1336
1337 #[instrument(level = "debug")]
1338 fn apply_source_references_update(
1339 &mut self,
1340 source_references: mz_catalog::durable::SourceReferences,
1341 diff: StateDiff,
1342 _retractions: &mut InProgressRetractions,
1343 ) {
1344 match diff {
1345 StateDiff::Addition => {
1346 let prev = self
1347 .source_references
1348 .insert(source_references.source_id, source_references.into());
1349 assert!(
1350 prev.is_none(),
1351 "values must be explicitly retracted before inserting a new value: {prev:?}"
1352 );
1353 }
1354 StateDiff::Retraction => {
1355 let prev = self.source_references.remove(&source_references.source_id);
1356 assert!(
1357 prev.is_some(),
1358 "retraction for a non-existent existing value: {source_references:?}"
1359 );
1360 }
1361 }
1362 }
1363
1364 #[instrument(level = "debug")]
1365 fn apply_storage_collection_metadata_update(
1366 &mut self,
1367 storage_collection_metadata: mz_catalog::durable::StorageCollectionMetadata,
1368 diff: StateDiff,
1369 _retractions: &mut InProgressRetractions,
1370 ) {
1371 apply_inverted_lookup(
1372 &mut Arc::make_mut(&mut self.storage_metadata).collection_metadata,
1373 &storage_collection_metadata.id,
1374 storage_collection_metadata.shard,
1375 diff,
1376 );
1377 }
1378
1379 #[instrument(level = "debug")]
1380 fn apply_unfinalized_shard_update(
1381 &mut self,
1382 unfinalized_shard: mz_catalog::durable::UnfinalizedShard,
1383 diff: StateDiff,
1384 _retractions: &mut InProgressRetractions,
1385 ) {
1386 match diff {
1387 StateDiff::Addition => {
1388 let newly_inserted = Arc::make_mut(&mut self.storage_metadata)
1389 .unfinalized_shards
1390 .insert(unfinalized_shard.shard);
1391 assert!(
1392 newly_inserted,
1393 "values must be explicitly retracted before inserting a new value: {unfinalized_shard:?}",
1394 );
1395 }
1396 StateDiff::Retraction => {
1397 let removed = Arc::make_mut(&mut self.storage_metadata)
1398 .unfinalized_shards
1399 .remove(&unfinalized_shard.shard);
1400 assert!(
1401 removed,
1402 "retraction does not match existing value: {unfinalized_shard:?}"
1403 );
1404 }
1405 }
1406 }
1407
1408 #[instrument]
1411 pub(crate) fn generate_builtin_table_updates(
1412 &self,
1413 updates: Vec<StateUpdate>,
1414 ) -> Vec<BuiltinTableUpdate> {
1415 let mut builtin_table_updates = Vec::new();
1416 for StateUpdate { kind, ts: _, diff } in updates {
1417 let builtin_table_update = self.generate_builtin_table_update(kind, diff);
1418 let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
1419 builtin_table_updates.extend(builtin_table_update);
1420 }
1421 builtin_table_updates
1422 }
1423
1424 #[instrument(level = "debug")]
1427 pub(crate) fn generate_builtin_table_update(
1428 &self,
1429 kind: StateUpdateKind,
1430 diff: StateDiff,
1431 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1432 let diff = diff.into();
1433 match kind {
1434 StateUpdateKind::Role(role) => self.pack_role_update(role.id, diff),
1435 StateUpdateKind::RoleAuth(role_auth) => {
1436 vec![self.pack_role_auth_update(role_auth.role_id, diff)]
1437 }
1438 StateUpdateKind::DefaultPrivilege(default_privilege) => {
1439 vec![self.pack_default_privileges_update(
1440 &default_privilege.object,
1441 &default_privilege.acl_item.grantee,
1442 &default_privilege.acl_item.acl_mode,
1443 diff,
1444 )]
1445 }
1446 StateUpdateKind::SystemPrivilege(system_privilege) => {
1447 vec![self.pack_system_privileges_update(system_privilege, diff)]
1448 }
1449 StateUpdateKind::SystemConfiguration(_) => Vec::new(),
1450 StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
1451 StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
1452 self.pack_item_update(introspection_source_index.item_id, diff)
1453 }
1454 StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
1455 cluster_replica.cluster_id,
1456 &cluster_replica.name,
1457 diff,
1458 ),
1459 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
1460 if !system_object_mapping.unique_identifier.runtime_alterable() {
1464 self.pack_item_update(system_object_mapping.unique_identifier.catalog_id, diff)
1465 } else {
1466 vec![]
1467 }
1468 }
1469 StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
1470 StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
1471 StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
1472 comment.object_id,
1473 comment.sub_component,
1474 &comment.comment,
1475 diff,
1476 )],
1477 StateUpdateKind::SourceReferences(source_references) => {
1478 self.pack_source_references_update(&source_references, diff)
1479 }
1480 StateUpdateKind::AuditLog(audit_log) => {
1481 vec![
1482 self.pack_audit_log_update(&audit_log.event, diff)
1483 .expect("could not pack audit log update"),
1484 ]
1485 }
1486 StateUpdateKind::Database(_)
1487 | StateUpdateKind::Schema(_)
1488 | StateUpdateKind::NetworkPolicy(_)
1489 | StateUpdateKind::StorageCollectionMetadata(_)
1490 | StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
1491 }
1492 }
1493
1494 fn get_entry_mut(&mut self, id: &CatalogItemId) -> &mut CatalogEntry {
1495 self.entry_by_id
1496 .get_mut(id)
1497 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id}"))
1498 }
1499
1500 pub(super) fn set_optimized_plan(
1506 &mut self,
1507 id: GlobalId,
1508 plan: DataflowDescription<mz_expr::OptimizedMirRelationExpr>,
1509 ) {
1510 let item_id = self.entry_by_global_id[&id];
1511 let entry = self.get_entry_mut(&item_id);
1512 match entry.item_mut() {
1513 CatalogItem::Index(idx) => idx.optimized_plan = Some(Arc::new(plan)),
1514 CatalogItem::MaterializedView(mv) => mv.optimized_plan = Some(Arc::new(plan)),
1515 CatalogItem::ContinualTask(ct) => ct.optimized_plan = Some(Arc::new(plan)),
1516 other => panic!("set_optimized_plan called on {} ({:?})", id, other.typ()),
1517 }
1518 }
1519
1520 pub(super) fn set_physical_plan(
1526 &mut self,
1527 id: GlobalId,
1528 plan: DataflowDescription<mz_compute_types::plan::Plan>,
1529 ) {
1530 let item_id = self.entry_by_global_id[&id];
1531 let entry = self.get_entry_mut(&item_id);
1532 match entry.item_mut() {
1533 CatalogItem::Index(idx) => idx.physical_plan = Some(Arc::new(plan)),
1534 CatalogItem::MaterializedView(mv) => mv.physical_plan = Some(Arc::new(plan)),
1535 CatalogItem::ContinualTask(ct) => ct.physical_plan = Some(Arc::new(plan)),
1536 other => panic!("set_physical_plan called on {} ({:?})", id, other.typ()),
1537 }
1538 }
1539
1540 pub(super) fn set_dataflow_metainfo(
1546 &mut self,
1547 id: GlobalId,
1548 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
1549 ) {
1550 for notice in metainfo.optimizer_notices.iter() {
1552 for dep_id in notice.dependencies.iter() {
1553 self.notices_by_dep_id
1554 .entry(*dep_id)
1555 .or_default()
1556 .push(Arc::clone(notice));
1557 }
1558 if let Some(item_id) = notice.item_id {
1559 soft_assert_eq_or_log!(
1560 item_id,
1561 id,
1562 "notice.item_id should match the id for whom we are saving the notice"
1563 );
1564 }
1565 }
1566 let item_id = self.entry_by_global_id[&id];
1568 let entry = self.get_entry_mut(&item_id);
1569 match entry.item_mut() {
1570 CatalogItem::Index(idx) => idx.dataflow_metainfo = Some(metainfo),
1571 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo = Some(metainfo),
1572 CatalogItem::ContinualTask(ct) => ct.dataflow_metainfo = Some(metainfo),
1573 other => panic!("set_dataflow_metainfo called on {} ({:?})", id, other.typ()),
1574 }
1575 }
1576
1577 #[mz_ore::instrument(level = "trace")]
1588 pub(super) fn drop_optimizer_notices(
1589 &mut self,
1590 dropped_entries: Vec<CatalogEntry>,
1591 ) -> BTreeSet<Arc<OptimizerNotice>> {
1592 let mut dropped_notices = BTreeSet::new();
1593 let mut drop_ids = BTreeSet::new();
1594
1595 for mut entry in dropped_entries {
1597 drop_ids.extend(entry.global_ids());
1598 let metainfo = match entry.item_mut() {
1599 CatalogItem::Index(idx) => idx.dataflow_metainfo.take(),
1600 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.take(),
1601 CatalogItem::ContinualTask(ct) => ct.dataflow_metainfo.take(),
1602 _ => None,
1603 };
1604 if let Some(mut metainfo) = metainfo {
1605 soft_assert_or_log!(
1606 metainfo.optimizer_notices.iter().all_unique(),
1607 "should have been pushed there by \
1608 `push_optimizer_notice_dedup`"
1609 );
1610 for n in metainfo.optimizer_notices.drain(..) {
1611 for dep_id in n.dependencies.iter() {
1614 if let Some(notices) = self.notices_by_dep_id.get_mut(dep_id) {
1615 notices.retain(|x| &n != x);
1616 if notices.is_empty() {
1617 self.notices_by_dep_id.remove(dep_id);
1618 }
1619 }
1620 }
1621 dropped_notices.insert(n);
1622 }
1623 }
1624 }
1625
1626 for id in &drop_ids {
1630 if let Some(notices) = self.notices_by_dep_id.remove(id) {
1631 for n in notices.into_iter() {
1632 if let Some(item_id) = n.item_id.as_ref() {
1637 if let Some(entry) = self.try_get_entry_by_global_id(item_id) {
1638 let catalog_item_id = entry.id();
1639 let entry = self.get_entry_mut(&catalog_item_id);
1640 let item = entry.item_mut();
1641 match item {
1642 CatalogItem::Index(idx) => {
1643 if let Some(ref mut m) = idx.dataflow_metainfo {
1644 m.optimizer_notices.retain(|x| &n != x);
1645 }
1646 }
1647 CatalogItem::MaterializedView(mv) => {
1648 if let Some(ref mut m) = mv.dataflow_metainfo {
1649 m.optimizer_notices.retain(|x| &n != x);
1650 }
1651 }
1652 CatalogItem::ContinualTask(ct) => {
1653 if let Some(ref mut m) = ct.dataflow_metainfo {
1654 m.optimizer_notices.retain(|x| &n != x);
1655 }
1656 }
1657 _ => {}
1658 }
1659 }
1660 }
1661 dropped_notices.insert(n);
1662 }
1663 }
1664 }
1665
1666 let todo_dep_ids: BTreeSet<GlobalId> = dropped_notices
1669 .iter()
1670 .flat_map(|n| n.dependencies.iter())
1671 .filter(|dep_id| !drop_ids.contains(dep_id))
1672 .copied()
1673 .collect();
1674 for id in todo_dep_ids {
1675 if let Some(notices) = self.notices_by_dep_id.get_mut(&id) {
1676 notices.retain(|n| !dropped_notices.contains(n));
1677 if notices.is_empty() {
1678 self.notices_by_dep_id.remove(&id);
1679 }
1680 }
1681 }
1682
1683 dropped_notices
1684 }
1685
1686 fn get_schema_mut(
1687 &mut self,
1688 database_spec: &ResolvedDatabaseSpecifier,
1689 schema_spec: &SchemaSpecifier,
1690 conn_id: &ConnectionId,
1691 ) -> &mut Schema {
1692 match (database_spec, schema_spec) {
1694 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => self
1695 .temporary_schemas
1696 .get_mut(conn_id)
1697 .expect("catalog out of sync"),
1698 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => self
1699 .ambient_schemas_by_id
1700 .get_mut(id)
1701 .expect("catalog out of sync"),
1702 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1703 .database_by_id
1704 .get_mut(database_id)
1705 .expect("catalog out of sync")
1706 .schemas_by_id
1707 .get_mut(schema_id)
1708 .expect("catalog out of sync"),
1709 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1710 unreachable!("temporary schemas are in the ambient database")
1711 }
1712 }
1713 }
1714
1715 #[instrument(name = "catalog::parse_views")]
1725 async fn parse_builtin_views(
1726 state: &mut CatalogState,
1727 builtin_views: Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>,
1728 retractions: &mut InProgressRetractions,
1729 local_expression_cache: &mut LocalExpressionCache,
1730 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1731 let mut builtin_table_updates = Vec::with_capacity(builtin_views.len());
1732 let (updates, additions): (Vec<_>, Vec<_>) =
1733 builtin_views
1734 .into_iter()
1735 .partition_map(|(view, item_id, gid)| {
1736 match retractions.system_object_mappings.remove(&item_id) {
1737 Some(entry) => Either::Left(entry),
1738 None => Either::Right((view, item_id, gid)),
1739 }
1740 });
1741
1742 for entry in updates {
1743 let item_id = entry.id();
1748 state.insert_entry(entry);
1749 builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
1750 }
1751
1752 let mut handles = Vec::new();
1753 let mut awaiting_id_dependencies: BTreeMap<CatalogItemId, Vec<CatalogItemId>> =
1754 BTreeMap::new();
1755 let mut awaiting_name_dependencies: BTreeMap<String, Vec<CatalogItemId>> = BTreeMap::new();
1756 let mut awaiting_all = Vec::new();
1759 let mut completed_ids: BTreeSet<CatalogItemId> = BTreeSet::new();
1761 let mut completed_names: BTreeSet<String> = BTreeSet::new();
1762
1763 let mut views: BTreeMap<CatalogItemId, (&BuiltinView, GlobalId)> = additions
1765 .into_iter()
1766 .map(|(view, item_id, gid)| (item_id, (view, gid)))
1767 .collect();
1768 let item_ids: Vec<_> = views.keys().copied().collect();
1769
1770 let mut ready: VecDeque<CatalogItemId> = views.keys().cloned().collect();
1771 while !handles.is_empty() || !ready.is_empty() || !awaiting_all.is_empty() {
1772 if handles.is_empty() && ready.is_empty() {
1773 ready.extend(awaiting_all.drain(..));
1775 }
1776
1777 if !ready.is_empty() {
1779 let spawn_state = Arc::new(state.clone());
1780 while let Some(id) = ready.pop_front() {
1781 let (view, global_id) = views.get(&id).expect("must exist");
1782 let global_id = *global_id;
1783 let create_sql = view.create_sql();
1784 let versions = BTreeMap::new();
1786
1787 let span = info_span!(parent: None, "parse builtin view", name = view.name);
1788 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
1789 let task_state = Arc::clone(&spawn_state);
1790 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1791 let handle = mz_ore::task::spawn_blocking(
1792 || "parse view",
1793 move || {
1794 span.in_scope(|| {
1795 let res = task_state.parse_item_inner(
1796 global_id,
1797 &create_sql,
1798 &versions,
1799 None,
1800 false,
1801 None,
1802 cached_expr,
1803 None,
1804 );
1805 (id, global_id, res)
1806 })
1807 },
1808 );
1809 handles.push(handle);
1810 }
1811 }
1812
1813 let (selected, _idx, remaining) = future::select_all(handles).await;
1815 handles = remaining;
1816 let (id, global_id, res) = selected;
1817 let mut insert_cached_expr = |cached_expr| {
1818 if let Some(cached_expr) = cached_expr {
1819 local_expression_cache.insert_cached_expression(global_id, cached_expr);
1820 }
1821 };
1822 match res {
1823 Ok((item, uncached_expr)) => {
1824 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1825 local_expression_cache.insert_uncached_expression(
1826 global_id,
1827 uncached_expr,
1828 optimizer_features,
1829 );
1830 }
1831 let (view, _gid) = views.remove(&id).expect("must exist");
1833 let schema_id = state
1834 .ambient_schemas_by_name
1835 .get(view.schema)
1836 .unwrap_or_else(|| panic!("unknown ambient schema: {}", view.schema));
1837 let qname = QualifiedItemName {
1838 qualifiers: ItemQualifiers {
1839 database_spec: ResolvedDatabaseSpecifier::Ambient,
1840 schema_spec: SchemaSpecifier::Id(*schema_id),
1841 },
1842 item: view.name.into(),
1843 };
1844 let mut acl_items = vec![rbac::owner_privilege(
1845 mz_sql::catalog::ObjectType::View,
1846 MZ_SYSTEM_ROLE_ID,
1847 )];
1848 acl_items.extend_from_slice(&view.access);
1849
1850 state.insert_item(
1851 id,
1852 view.oid,
1853 qname,
1854 item,
1855 MZ_SYSTEM_ROLE_ID,
1856 PrivilegeMap::from_mz_acl_items(acl_items),
1857 );
1858
1859 let mut resolved_dependent_items = Vec::new();
1861 if let Some(dependent_items) = awaiting_id_dependencies.remove(&id) {
1862 resolved_dependent_items.extend(dependent_items);
1863 }
1864 let entry = state.get_entry(&id);
1865 let full_name = state.resolve_full_name(entry.name(), None).to_string();
1866 if let Some(dependent_items) = awaiting_name_dependencies.remove(&full_name) {
1867 resolved_dependent_items.extend(dependent_items);
1868 }
1869 ready.extend(resolved_dependent_items);
1870
1871 completed_ids.insert(id);
1872 completed_names.insert(full_name);
1873 }
1874 Err((
1876 AdapterError::PlanError(plan::PlanError::InvalidId(missing_dep)),
1877 cached_expr,
1878 )) => {
1879 insert_cached_expr(cached_expr);
1880 if completed_ids.contains(&missing_dep) {
1881 ready.push_back(id);
1882 } else {
1883 awaiting_id_dependencies
1884 .entry(missing_dep)
1885 .or_default()
1886 .push(id);
1887 }
1888 }
1889 Err((
1891 AdapterError::PlanError(plan::PlanError::Catalog(
1892 SqlCatalogError::UnknownItem(missing_dep),
1893 )),
1894 cached_expr,
1895 )) => {
1896 insert_cached_expr(cached_expr);
1897 match CatalogItemId::from_str(&missing_dep) {
1898 Ok(missing_dep) => {
1899 if completed_ids.contains(&missing_dep) {
1900 ready.push_back(id);
1901 } else {
1902 awaiting_id_dependencies
1903 .entry(missing_dep)
1904 .or_default()
1905 .push(id);
1906 }
1907 }
1908 Err(_) => {
1909 if completed_names.contains(&missing_dep) {
1910 ready.push_back(id);
1911 } else {
1912 awaiting_name_dependencies
1913 .entry(missing_dep)
1914 .or_default()
1915 .push(id);
1916 }
1917 }
1918 }
1919 }
1920 Err((
1921 AdapterError::PlanError(plan::PlanError::InvalidCast { .. }),
1922 cached_expr,
1923 )) => {
1924 insert_cached_expr(cached_expr);
1925 awaiting_all.push(id);
1926 }
1927 Err((e, _)) => {
1928 let (bad_view, _gid) = views.get(&id).expect("must exist");
1929 panic!(
1930 "internal error: failed to load bootstrap view:\n\
1931 {name}\n\
1932 error:\n\
1933 {e:?}\n\n\
1934 Make sure that the schema name is specified in the builtin view's create sql statement.
1935 ",
1936 name = bad_view.name,
1937 )
1938 }
1939 }
1940 }
1941
1942 assert!(awaiting_id_dependencies.is_empty());
1943 assert!(
1944 awaiting_name_dependencies.is_empty(),
1945 "awaiting_name_dependencies: {awaiting_name_dependencies:?}"
1946 );
1947 assert!(awaiting_all.is_empty());
1948 assert!(views.is_empty());
1949
1950 builtin_table_updates.extend(
1952 item_ids
1953 .into_iter()
1954 .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
1955 );
1956
1957 builtin_table_updates
1958 }
1959
1960 fn insert_entry(&mut self, entry: CatalogEntry) {
1962 if !entry.id.is_system() {
1963 if let Some(cluster_id) = entry.item.cluster_id() {
1964 self.clusters_by_id
1965 .get_mut(&cluster_id)
1966 .expect("catalog out of sync")
1967 .bound_objects
1968 .insert(entry.id);
1969 };
1970 }
1971
1972 for u in entry.references().items() {
1973 match self.entry_by_id.get_mut(u) {
1974 Some(metadata) => metadata.referenced_by.push(entry.id()),
1975 None => panic!(
1976 "Catalog: missing dependent catalog item {} while installing {}",
1977 &u,
1978 self.resolve_full_name(entry.name(), entry.conn_id())
1979 ),
1980 }
1981 }
1982 for u in entry.uses() {
1983 if u == entry.id() {
1986 continue;
1987 }
1988 match self.entry_by_id.get_mut(&u) {
1989 Some(metadata) => metadata.used_by.push(entry.id()),
1990 None => panic!(
1991 "Catalog: missing dependent catalog item {} while installing {}",
1992 &u,
1993 self.resolve_full_name(entry.name(), entry.conn_id())
1994 ),
1995 }
1996 }
1997 for gid in entry.item.global_ids() {
1998 self.entry_by_global_id.insert(gid, entry.id());
1999 }
2000 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2001 if entry.name().qualifiers.schema_spec == SchemaSpecifier::Temporary
2004 && !self.temporary_schemas.contains_key(conn_id)
2005 {
2006 self.create_temporary_schema(conn_id, entry.owner_id)
2007 .expect("failed to create temporary schema");
2008 }
2009 let schema = self.get_schema_mut(
2010 &entry.name().qualifiers.database_spec,
2011 &entry.name().qualifiers.schema_spec,
2012 conn_id,
2013 );
2014
2015 let prev_id = match entry.item() {
2016 CatalogItem::Func(_) => schema
2017 .functions
2018 .insert(entry.name().item.clone(), entry.id()),
2019 CatalogItem::Type(_) => schema.types.insert(entry.name().item.clone(), entry.id()),
2020 _ => schema.items.insert(entry.name().item.clone(), entry.id()),
2021 };
2022
2023 assert!(
2024 prev_id.is_none(),
2025 "builtin name collision on {:?}",
2026 entry.name().item.clone()
2027 );
2028
2029 self.entry_by_id.insert(entry.id(), entry.clone());
2030 }
2031
2032 fn insert_item(
2034 &mut self,
2035 id: CatalogItemId,
2036 oid: u32,
2037 name: QualifiedItemName,
2038 item: CatalogItem,
2039 owner_id: RoleId,
2040 privileges: PrivilegeMap,
2041 ) {
2042 let entry = CatalogEntry {
2043 item,
2044 name,
2045 id,
2046 oid,
2047 used_by: Vec::new(),
2048 referenced_by: Vec::new(),
2049 owner_id,
2050 privileges,
2051 };
2052
2053 self.insert_entry(entry);
2054 }
2055
2056 #[mz_ore::instrument(level = "trace")]
2057 fn drop_item(&mut self, id: CatalogItemId) -> CatalogEntry {
2058 let metadata = self.entry_by_id.remove(&id).expect("catalog out of sync");
2059 for u in metadata.references().items() {
2060 if let Some(dep_metadata) = self.entry_by_id.get_mut(u) {
2061 dep_metadata.referenced_by.retain(|u| *u != metadata.id())
2062 }
2063 }
2064 for u in metadata.uses() {
2065 if let Some(dep_metadata) = self.entry_by_id.get_mut(&u) {
2066 dep_metadata.used_by.retain(|u| *u != metadata.id())
2067 }
2068 }
2069 for gid in metadata.global_ids() {
2070 self.entry_by_global_id.remove(&gid);
2071 }
2072
2073 let conn_id = metadata.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
2074 let schema = self.get_schema_mut(
2075 &metadata.name().qualifiers.database_spec,
2076 &metadata.name().qualifiers.schema_spec,
2077 conn_id,
2078 );
2079 if metadata.item_type() == CatalogItemType::Type {
2080 schema
2081 .types
2082 .remove(&metadata.name().item)
2083 .expect("catalog out of sync");
2084 } else {
2085 assert_ne!(metadata.item_type(), CatalogItemType::Func);
2088
2089 schema
2090 .items
2091 .remove(&metadata.name().item)
2092 .expect("catalog out of sync");
2093 };
2094
2095 if !id.is_system() {
2096 if let Some(cluster_id) = metadata.item().cluster_id() {
2097 assert!(
2098 self.clusters_by_id
2099 .get_mut(&cluster_id)
2100 .expect("catalog out of sync")
2101 .bound_objects
2102 .remove(&id),
2103 "catalog out of sync"
2104 );
2105 }
2106 }
2107
2108 metadata
2109 }
2110
2111 fn insert_introspection_source_index(
2112 &mut self,
2113 cluster_id: ClusterId,
2114 log: &'static BuiltinLog,
2115 item_id: CatalogItemId,
2116 global_id: GlobalId,
2117 oid: u32,
2118 ) {
2119 let (index_name, index) =
2120 self.create_introspection_source_index(cluster_id, log, global_id);
2121 self.insert_item(
2122 item_id,
2123 oid,
2124 index_name,
2125 index,
2126 MZ_SYSTEM_ROLE_ID,
2127 PrivilegeMap::default(),
2128 );
2129 }
2130
2131 fn create_introspection_source_index(
2132 &self,
2133 cluster_id: ClusterId,
2134 log: &'static BuiltinLog,
2135 global_id: GlobalId,
2136 ) -> (QualifiedItemName, CatalogItem) {
2137 let source_name = FullItemName {
2138 database: RawDatabaseSpecifier::Ambient,
2139 schema: log.schema.into(),
2140 item: log.name.into(),
2141 };
2142 let index_name = format!("{}_{}_primary_idx", log.name, cluster_id);
2143 let mut index_name = QualifiedItemName {
2144 qualifiers: ItemQualifiers {
2145 database_spec: ResolvedDatabaseSpecifier::Ambient,
2146 schema_spec: SchemaSpecifier::Id(self.get_mz_introspection_schema_id()),
2147 },
2148 item: index_name.clone(),
2149 };
2150 index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
2151 let index_item_name = index_name.item.clone();
2152 let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
2153 let index = CatalogItem::Index(Index {
2154 global_id,
2155 on: log_global_id,
2156 keys: log
2157 .variant
2158 .index_by()
2159 .into_iter()
2160 .map(MirScalarExpr::column)
2161 .collect(),
2162 create_sql: index_sql(
2163 index_item_name,
2164 cluster_id,
2165 source_name,
2166 &log.variant.desc(),
2167 &log.variant.index_by(),
2168 ),
2169 conn_id: None,
2170 resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
2171 cluster_id,
2172 is_retained_metrics_object: false,
2173 custom_logical_compaction_window: None,
2174 optimized_plan: None,
2175 physical_plan: None,
2176 dataflow_metainfo: None,
2177 });
2178 (index_name, index)
2179 }
2180
2181 fn insert_system_configuration(&mut self, name: &str, value: VarInput) -> Result<bool, Error> {
2186 Ok(Arc::make_mut(&mut self.system_configuration).set(name, value)?)
2187 }
2188
2189 fn remove_system_configuration(&mut self, name: &str) -> Result<bool, Error> {
2194 Ok(Arc::make_mut(&mut self.system_configuration).reset(name)?)
2195 }
2196}
2197
2198fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
2206 fn push_update<T>(
2207 update: T,
2208 diff: StateDiff,
2209 retractions: &mut Vec<T>,
2210 additions: &mut Vec<T>,
2211 ) {
2212 match diff {
2213 StateDiff::Retraction => retractions.push(update),
2214 StateDiff::Addition => additions.push(update),
2215 }
2216 }
2217
2218 soft_assert_no_log!(
2219 updates.iter().map(|update| update.ts).all_equal(),
2220 "all timestamps should be equal: {updates:?}"
2221 );
2222 soft_assert_no_log!(
2223 {
2224 let mut dedup = BTreeSet::new();
2225 updates.iter().all(|update| dedup.insert(&update.kind))
2226 },
2227 "updates should be consolidated: {updates:?}"
2228 );
2229
2230 let mut pre_cluster_retractions = Vec::new();
2232 let mut pre_cluster_additions = Vec::new();
2233 let mut cluster_retractions = Vec::new();
2234 let mut cluster_additions = Vec::new();
2235 let mut builtin_item_updates = Vec::new();
2236 let mut item_retractions = Vec::new();
2237 let mut item_additions = Vec::new();
2238 let mut temp_item_retractions = Vec::new();
2239 let mut temp_item_additions = Vec::new();
2240 let mut post_item_retractions = Vec::new();
2241 let mut post_item_additions = Vec::new();
2242 for update in updates {
2243 let diff = update.diff.clone();
2244 match update.kind {
2245 StateUpdateKind::Role(_)
2246 | StateUpdateKind::RoleAuth(_)
2247 | StateUpdateKind::Database(_)
2248 | StateUpdateKind::Schema(_)
2249 | StateUpdateKind::DefaultPrivilege(_)
2250 | StateUpdateKind::SystemPrivilege(_)
2251 | StateUpdateKind::SystemConfiguration(_)
2252 | StateUpdateKind::NetworkPolicy(_) => push_update(
2253 update,
2254 diff,
2255 &mut pre_cluster_retractions,
2256 &mut pre_cluster_additions,
2257 ),
2258 StateUpdateKind::Cluster(_)
2259 | StateUpdateKind::IntrospectionSourceIndex(_)
2260 | StateUpdateKind::ClusterReplica(_) => push_update(
2261 update,
2262 diff,
2263 &mut cluster_retractions,
2264 &mut cluster_additions,
2265 ),
2266 StateUpdateKind::SystemObjectMapping(system_object_mapping) => {
2267 builtin_item_updates.push((system_object_mapping, update.ts, update.diff))
2268 }
2269 StateUpdateKind::TemporaryItem(item) => push_update(
2270 (item, update.ts, update.diff),
2271 diff,
2272 &mut temp_item_retractions,
2273 &mut temp_item_additions,
2274 ),
2275 StateUpdateKind::Item(item) => push_update(
2276 (item, update.ts, update.diff),
2277 diff,
2278 &mut item_retractions,
2279 &mut item_additions,
2280 ),
2281 StateUpdateKind::Comment(_)
2282 | StateUpdateKind::SourceReferences(_)
2283 | StateUpdateKind::AuditLog(_)
2284 | StateUpdateKind::StorageCollectionMetadata(_)
2285 | StateUpdateKind::UnfinalizedShard(_) => push_update(
2286 update,
2287 diff,
2288 &mut post_item_retractions,
2289 &mut post_item_additions,
2290 ),
2291 }
2292 }
2293
2294 let builtin_item_updates = builtin_item_updates
2297 .into_iter()
2298 .map(|(system_object_mapping, ts, diff)| {
2299 let idx = BUILTIN_LOOKUP
2300 .get(&system_object_mapping.description)
2301 .expect("missing builtin")
2302 .0;
2303 (idx, system_object_mapping, ts, diff)
2304 })
2305 .sorted_by_key(|(idx, _, _, _)| *idx)
2306 .map(|(_, system_object_mapping, ts, diff)| (system_object_mapping, ts, diff));
2307
2308 let mut builtin_source_retractions = Vec::new();
2312 let mut builtin_source_additions = Vec::new();
2313 let mut other_builtin_retractions = Vec::new();
2314 let mut other_builtin_additions = Vec::new();
2315 for (builtin_item_update, ts, diff) in builtin_item_updates {
2316 let object_type = builtin_item_update.description.object_type;
2317 let update = StateUpdate {
2318 kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
2319 ts,
2320 diff,
2321 };
2322 if object_type == CatalogItemType::Source {
2323 push_update(
2324 update,
2325 diff,
2326 &mut builtin_source_retractions,
2327 &mut builtin_source_additions,
2328 );
2329 } else {
2330 push_update(
2331 update,
2332 diff,
2333 &mut other_builtin_retractions,
2334 &mut other_builtin_additions,
2335 );
2336 }
2337 }
2338
2339 fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2345 tracing::debug!(?items, "sorting items by dependencies");
2346
2347 let key_fn = |item: &(mz_catalog::durable::Item, _, _)| item.0.id;
2348 let dependencies_fn = |item: &(mz_catalog::durable::Item, _, _)| {
2349 let statement = mz_sql::parse::parse(&item.0.create_sql)
2350 .expect("valid create_sql")
2351 .into_element()
2352 .ast;
2353 mz_sql::names::dependencies(&statement).expect("failed to find dependencies of item")
2354 };
2355 sort_topological(items, key_fn, dependencies_fn);
2356 }
2357
2358 fn sort_item_updates(
2374 item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2375 ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
2376 let mut types = Vec::new();
2379 let mut funcs = Vec::new();
2382 let mut secrets = Vec::new();
2383 let mut connections = Vec::new();
2384 let mut sources = Vec::new();
2385 let mut tables = Vec::new();
2386 let mut derived_items = Vec::new();
2387 let mut sinks = Vec::new();
2388 let mut continual_tasks = Vec::new();
2389
2390 for update in item_updates {
2391 match update.0.item_type() {
2392 CatalogItemType::Type => types.push(update),
2393 CatalogItemType::Func => funcs.push(update),
2394 CatalogItemType::Secret => secrets.push(update),
2395 CatalogItemType::Connection => connections.push(update),
2396 CatalogItemType::Source => sources.push(update),
2397 CatalogItemType::Table => tables.push(update),
2398 CatalogItemType::View
2399 | CatalogItemType::MaterializedView
2400 | CatalogItemType::Index => derived_items.push(update),
2401 CatalogItemType::Sink => sinks.push(update),
2402 CatalogItemType::ContinualTask => continual_tasks.push(update),
2403 }
2404 }
2405
2406 sort_items_topological(&mut connections);
2410 sort_items_topological(&mut derived_items);
2411
2412 for group in [
2414 &mut types,
2415 &mut funcs,
2416 &mut secrets,
2417 &mut sources,
2418 &mut tables,
2419 &mut sinks,
2420 &mut continual_tasks,
2421 ] {
2422 group.sort_by_key(|(item, _, _)| item.id);
2423 }
2424
2425 iter::empty()
2426 .chain(types)
2427 .chain(funcs)
2428 .chain(secrets)
2429 .chain(connections)
2430 .chain(sources)
2431 .chain(tables)
2432 .chain(derived_items)
2433 .chain(sinks)
2434 .chain(continual_tasks)
2435 .collect()
2436 }
2437
2438 let item_retractions = sort_item_updates(item_retractions);
2439 let item_additions = sort_item_updates(item_additions);
2440
2441 fn sort_temp_item_updates(
2445 temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
2446 ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
2447 let mut types = Vec::new();
2450 let mut funcs = Vec::new();
2452 let mut secrets = Vec::new();
2453 let mut connections = Vec::new();
2454 let mut sources = Vec::new();
2455 let mut tables = Vec::new();
2456 let mut derived_items = Vec::new();
2457 let mut sinks = Vec::new();
2458 let mut continual_tasks = Vec::new();
2459
2460 for update in temp_item_updates {
2461 match update.0.item_type() {
2462 CatalogItemType::Type => types.push(update),
2463 CatalogItemType::Func => funcs.push(update),
2464 CatalogItemType::Secret => secrets.push(update),
2465 CatalogItemType::Connection => connections.push(update),
2466 CatalogItemType::Source => sources.push(update),
2467 CatalogItemType::Table => tables.push(update),
2468 CatalogItemType::View
2469 | CatalogItemType::MaterializedView
2470 | CatalogItemType::Index => derived_items.push(update),
2471 CatalogItemType::Sink => sinks.push(update),
2472 CatalogItemType::ContinualTask => continual_tasks.push(update),
2473 }
2474 }
2475
2476 for group in [
2478 &mut types,
2479 &mut funcs,
2480 &mut secrets,
2481 &mut connections,
2482 &mut sources,
2483 &mut tables,
2484 &mut derived_items,
2485 &mut sinks,
2486 &mut continual_tasks,
2487 ] {
2488 group.sort_by_key(|(item, _, _)| item.id);
2489 }
2490
2491 iter::empty()
2492 .chain(types)
2493 .chain(funcs)
2494 .chain(secrets)
2495 .chain(connections)
2496 .chain(sources)
2497 .chain(tables)
2498 .chain(derived_items)
2499 .chain(sinks)
2500 .chain(continual_tasks)
2501 .collect()
2502 }
2503 let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
2504 let temp_item_additions = sort_temp_item_updates(temp_item_additions);
2505
2506 fn merge_item_updates(
2508 mut item_updates: VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
2509 mut temp_item_updates: VecDeque<(TemporaryItem, Timestamp, StateDiff)>,
2510 ) -> Vec<StateUpdate> {
2511 let mut state_updates = Vec::with_capacity(item_updates.len() + temp_item_updates.len());
2512
2513 while let (Some((item, _, _)), Some((temp_item, _, _))) =
2514 (item_updates.front(), temp_item_updates.front())
2515 {
2516 if item.id < temp_item.id {
2517 let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
2518 state_updates.push(StateUpdate {
2519 kind: StateUpdateKind::Item(item),
2520 ts,
2521 diff,
2522 });
2523 } else if item.id > temp_item.id {
2524 let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
2525 state_updates.push(StateUpdate {
2526 kind: StateUpdateKind::TemporaryItem(temp_item),
2527 ts,
2528 diff,
2529 });
2530 } else {
2531 unreachable!(
2532 "two items cannot have the same ID: item={item:?}, temp_item={temp_item:?}"
2533 );
2534 }
2535 }
2536
2537 while let Some((item, ts, diff)) = item_updates.pop_front() {
2538 state_updates.push(StateUpdate {
2539 kind: StateUpdateKind::Item(item),
2540 ts,
2541 diff,
2542 });
2543 }
2544
2545 while let Some((temp_item, ts, diff)) = temp_item_updates.pop_front() {
2546 state_updates.push(StateUpdate {
2547 kind: StateUpdateKind::TemporaryItem(temp_item),
2548 ts,
2549 diff,
2550 });
2551 }
2552
2553 state_updates
2554 }
2555 let item_retractions = merge_item_updates(item_retractions, temp_item_retractions);
2556 let item_additions = merge_item_updates(item_additions, temp_item_additions);
2557
2558 iter::empty()
2560 .chain(post_item_retractions.into_iter().rev())
2562 .chain(item_retractions.into_iter().rev())
2563 .chain(other_builtin_retractions.into_iter().rev())
2564 .chain(cluster_retractions.into_iter().rev())
2565 .chain(builtin_source_retractions.into_iter().rev())
2566 .chain(pre_cluster_retractions.into_iter().rev())
2567 .chain(pre_cluster_additions)
2568 .chain(builtin_source_additions)
2569 .chain(cluster_additions)
2570 .chain(other_builtin_additions)
2571 .chain(item_additions)
2572 .chain(post_item_additions)
2573 .collect()
2574}
2575
2576enum ApplyState {
2581 BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
2583 Items(Vec<StateUpdate>),
2589 Updates(Vec<StateUpdate>),
2591}
2592
2593impl ApplyState {
2594 fn new(update: StateUpdate) -> Self {
2595 use StateUpdateKind::*;
2596 match &update.kind {
2597 SystemObjectMapping(som)
2598 if som.description.object_type == CatalogItemType::View
2599 && update.diff == StateDiff::Addition =>
2600 {
2601 let view_addition = lookup_builtin_view_addition(som.clone());
2602 Self::BuiltinViewAdditions(vec![view_addition])
2603 }
2604
2605 IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2606 Self::Items(vec![update])
2607 }
2608
2609 Role(_)
2610 | RoleAuth(_)
2611 | Database(_)
2612 | Schema(_)
2613 | DefaultPrivilege(_)
2614 | SystemPrivilege(_)
2615 | SystemConfiguration(_)
2616 | Cluster(_)
2617 | NetworkPolicy(_)
2618 | ClusterReplica(_)
2619 | SourceReferences(_)
2620 | Comment(_)
2621 | AuditLog(_)
2622 | StorageCollectionMetadata(_)
2623 | UnfinalizedShard(_) => Self::Updates(vec![update]),
2624 }
2625 }
2626
2627 async fn apply(
2633 self,
2634 state: &mut CatalogState,
2635 retractions: &mut InProgressRetractions,
2636 local_expression_cache: &mut LocalExpressionCache,
2637 ) -> (
2638 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2639 Vec<ParsedStateUpdate>,
2640 ) {
2641 match self {
2642 Self::BuiltinViewAdditions(builtin_view_additions) => {
2643 let restore = Arc::clone(&state.system_configuration);
2644 Arc::make_mut(&mut state.system_configuration).enable_for_item_parsing();
2645 let builtin_table_updates = CatalogState::parse_builtin_views(
2646 state,
2647 builtin_view_additions,
2648 retractions,
2649 local_expression_cache,
2650 )
2651 .await;
2652 state.system_configuration = restore;
2653 (builtin_table_updates, Vec::new())
2654 }
2655 Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
2656 state
2657 .apply_updates_inner(updates, retractions, local_expression_cache)
2658 .expect("corrupt catalog")
2659 }),
2660 Self::Updates(updates) => state
2661 .apply_updates_inner(updates, retractions, local_expression_cache)
2662 .expect("corrupt catalog"),
2663 }
2664 }
2665
2666 async fn step(
2667 self,
2668 next: Self,
2669 state: &mut CatalogState,
2670 retractions: &mut InProgressRetractions,
2671 local_expression_cache: &mut LocalExpressionCache,
2672 ) -> (
2673 Self,
2674 (
2675 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
2676 Vec<ParsedStateUpdate>,
2677 ),
2678 ) {
2679 match (self, next) {
2680 (
2681 Self::BuiltinViewAdditions(mut builtin_view_additions),
2682 Self::BuiltinViewAdditions(next_builtin_view_additions),
2683 ) => {
2684 builtin_view_additions.extend(next_builtin_view_additions);
2686 (
2687 Self::BuiltinViewAdditions(builtin_view_additions),
2688 (Vec::new(), Vec::new()),
2689 )
2690 }
2691 (Self::Items(mut updates), Self::Items(next_updates)) => {
2692 updates.extend(next_updates);
2694 (Self::Items(updates), (Vec::new(), Vec::new()))
2695 }
2696 (Self::Updates(mut updates), Self::Updates(next_updates)) => {
2697 updates.extend(next_updates);
2699 (Self::Updates(updates), (Vec::new(), Vec::new()))
2700 }
2701 (apply_state, next_apply_state) => {
2702 let updates = apply_state
2704 .apply(state, retractions, local_expression_cache)
2705 .await;
2706 (next_apply_state, updates)
2707 }
2708 }
2709 }
2710}
2711
2712trait MutableMap<K, V> {
2715 fn insert(&mut self, key: K, value: V) -> Option<V>;
2716 fn remove(&mut self, key: &K) -> Option<V>;
2717}
2718
2719impl<K: Ord, V> MutableMap<K, V> for BTreeMap<K, V> {
2720 fn insert(&mut self, key: K, value: V) -> Option<V> {
2721 BTreeMap::insert(self, key, value)
2722 }
2723 fn remove(&mut self, key: &K) -> Option<V> {
2724 BTreeMap::remove(self, key)
2725 }
2726}
2727
2728impl<K: Ord + Clone, V: Clone> MutableMap<K, V> for imbl::OrdMap<K, V> {
2729 fn insert(&mut self, key: K, value: V) -> Option<V> {
2730 imbl::OrdMap::insert(self, key, value)
2731 }
2732 fn remove(&mut self, key: &K) -> Option<V> {
2733 imbl::OrdMap::remove(self, key)
2734 }
2735}
2736
2737fn apply_inverted_lookup<K, V>(map: &mut impl MutableMap<K, V>, key: &K, value: V, diff: StateDiff)
2742where
2743 K: Ord + Clone + Debug,
2744 V: PartialEq + Debug,
2745{
2746 match diff {
2747 StateDiff::Retraction => {
2748 let prev = map.remove(key);
2749 assert_eq!(
2750 prev,
2751 Some(value),
2752 "retraction does not match existing value: {key:?}"
2753 );
2754 }
2755 StateDiff::Addition => {
2756 let prev = map.insert(key.clone(), value);
2757 assert_eq!(
2758 prev, None,
2759 "values must be explicitly retracted before inserting a new value: {key:?}"
2760 );
2761 }
2762 }
2763}
2764
2765fn apply_with_update<K, V, D>(
2768 map: &mut impl MutableMap<K, V>,
2769 durable: D,
2770 key_fn: impl FnOnce(&D) -> K,
2771 diff: StateDiff,
2772 retractions: &mut BTreeMap<D::Key, V>,
2773) where
2774 K: Ord,
2775 V: UpdateFrom<D> + PartialEq + Debug,
2776 D: DurableType,
2777 D::Key: Ord,
2778{
2779 match diff {
2780 StateDiff::Retraction => {
2781 let mem_key = key_fn(&durable);
2782 let value = map
2783 .remove(&mem_key)
2784 .expect("retraction does not match existing value: {key:?}");
2785 let durable_key = durable.into_key_value().0;
2786 retractions.insert(durable_key, value);
2787 }
2788 StateDiff::Addition => {
2789 let mem_key = key_fn(&durable);
2790 let durable_key = durable.key();
2791 let value = match retractions.remove(&durable_key) {
2792 Some(mut retraction) => {
2793 retraction.update_from(durable);
2794 retraction
2795 }
2796 None => durable.into(),
2797 };
2798 let prev = map.insert(mem_key, value);
2799 assert_eq!(
2800 prev, None,
2801 "values must be explicitly retracted before inserting a new value"
2802 );
2803 }
2804 }
2805}
2806
2807fn lookup_builtin_view_addition(
2809 mapping: SystemObjectMapping,
2810) -> (&'static BuiltinView, CatalogItemId, GlobalId) {
2811 let (_, builtin) = BUILTIN_LOOKUP
2812 .get(&mapping.description)
2813 .expect("missing builtin view");
2814 let Builtin::View(view) = builtin else {
2815 unreachable!("programming error, expected BuiltinView found {builtin:?}");
2816 };
2817
2818 (
2819 view,
2820 mapping.unique_identifier.catalog_id,
2821 mapping.unique_identifier.global_id,
2822 )
2823}