1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52 RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72 is_reserved_role_name, object_type_to_audit_object_type,
73 system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82 AlterRetainHistory {
83 id: CatalogItemId,
84 value: Option<Value>,
85 window: CompactionWindow,
86 },
87 AlterSourceTimestampInterval {
88 id: CatalogItemId,
89 value: Option<Value>,
90 interval: Duration,
91 },
92 AlterRole {
93 id: RoleId,
94 name: String,
95 attributes: RoleAttributesRaw,
96 nopassword: bool,
97 vars: RoleVars,
98 },
99 AlterNetworkPolicy {
100 id: NetworkPolicyId,
101 rules: Vec<NetworkPolicyRule>,
102 name: String,
103 owner_id: RoleId,
104 },
105 AlterAddColumn {
106 id: CatalogItemId,
107 new_global_id: GlobalId,
108 name: ColumnName,
109 typ: SqlColumnType,
110 sql: RawDataType,
111 },
112 AlterMaterializedViewApplyReplacement {
113 id: CatalogItemId,
114 replacement_id: CatalogItemId,
115 },
116 CreateDatabase {
117 name: String,
118 owner_id: RoleId,
119 },
120 CreateSchema {
121 database_id: ResolvedDatabaseSpecifier,
122 schema_name: String,
123 owner_id: RoleId,
124 },
125 CreateRole {
126 name: String,
127 attributes: RoleAttributesRaw,
128 },
129 CreateCluster {
130 id: ClusterId,
131 name: String,
132 introspection_sources: Vec<&'static BuiltinLog>,
133 owner_id: RoleId,
134 config: ClusterConfig,
135 },
136 CreateClusterReplica {
137 cluster_id: ClusterId,
138 name: String,
139 config: ReplicaConfig,
140 owner_id: RoleId,
141 reason: ReplicaCreateDropReason,
142 },
143 CreateItem {
144 id: CatalogItemId,
145 name: QualifiedItemName,
146 item: CatalogItem,
147 owner_id: RoleId,
148 },
149 CreateNetworkPolicy {
150 rules: Vec<NetworkPolicyRule>,
151 name: String,
152 owner_id: RoleId,
153 },
154 Comment {
155 object_id: CommentObjectId,
156 sub_component: Option<usize>,
157 comment: Option<String>,
158 },
159 DropObjects(Vec<DropObjectInfo>),
160 GrantRole {
161 role_id: RoleId,
162 member_id: RoleId,
163 grantor_id: RoleId,
164 },
165 RenameCluster {
166 id: ClusterId,
167 name: String,
168 to_name: String,
169 check_reserved_names: bool,
170 },
171 RenameClusterReplica {
172 cluster_id: ClusterId,
173 replica_id: ReplicaId,
174 name: QualifiedReplica,
175 to_name: String,
176 },
177 RenameItem {
178 id: CatalogItemId,
179 current_full_name: FullItemName,
180 to_name: String,
181 },
182 RenameSchema {
183 database_spec: ResolvedDatabaseSpecifier,
184 schema_spec: SchemaSpecifier,
185 new_name: String,
186 check_reserved_names: bool,
187 },
188 UpdateOwner {
189 id: ObjectId,
190 new_owner: RoleId,
191 },
192 UpdatePrivilege {
193 target_id: SystemObjectId,
194 privilege: MzAclItem,
195 variant: UpdatePrivilegeVariant,
196 },
197 UpdateDefaultPrivilege {
198 privilege_object: DefaultPrivilegeObject,
199 privilege_acl_item: DefaultPrivilegeAclItem,
200 variant: UpdatePrivilegeVariant,
201 },
202 RevokeRole {
203 role_id: RoleId,
204 member_id: RoleId,
205 grantor_id: RoleId,
206 },
207 UpdateClusterConfig {
208 id: ClusterId,
209 name: String,
210 config: ClusterConfig,
211 },
212 UpdateClusterReplicaConfig {
213 cluster_id: ClusterId,
214 replica_id: ReplicaId,
215 config: ReplicaConfig,
216 },
217 UpdateItem {
218 id: CatalogItemId,
219 name: QualifiedItemName,
220 to_item: CatalogItem,
221 },
222 UpdateSourceReferences {
223 source_id: CatalogItemId,
224 references: SourceReferences,
225 },
226 UpdateSystemConfiguration {
227 name: String,
228 value: OwnedVarInput,
229 },
230 ResetSystemConfiguration {
231 name: String,
232 },
233 ResetAllSystemConfiguration,
234 WeirdStorageUsageUpdates {
241 object_id: Option<String>,
242 size_bytes: u64,
243 collection_timestamp: EpochMillis,
244 },
245}
246
247#[derive(Debug, Clone)]
251pub enum DropObjectInfo {
252 Cluster(ClusterId),
253 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
254 Database(DatabaseId),
255 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
256 Role(RoleId),
257 Item(CatalogItemId),
258 NetworkPolicy(NetworkPolicyId),
259}
260
261impl DropObjectInfo {
262 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
265 match id {
266 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
267 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
268 cluster_id,
269 replica_id,
270 ReplicaCreateDropReason::Manual,
271 )),
272 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
273 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
274 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
275 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
276 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
277 }
278 }
279
280 fn to_object_id(&self) -> ObjectId {
283 match &self {
284 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
285 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
286 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
287 }
288 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
289 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
290 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
291 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
292 DropObjectInfo::NetworkPolicy(network_policy_id) => {
293 ObjectId::NetworkPolicy(network_policy_id.clone())
294 }
295 }
296 }
297}
298
299#[derive(Debug, Clone)]
301pub enum ReplicaCreateDropReason {
302 Manual,
307 ClusterScheduling(Vec<SchedulingDecision>),
310}
311
312impl ReplicaCreateDropReason {
313 pub fn into_audit_log(
314 self,
315 ) -> (
316 CreateOrDropClusterReplicaReasonV1,
317 Option<SchedulingDecisionsWithReasonsV2>,
318 ) {
319 let (reason, scheduling_policies) = match self {
320 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
321 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
322 CreateOrDropClusterReplicaReasonV1::Schedule,
323 Some(scheduling_decisions),
324 ),
325 };
326 (
327 reason,
328 scheduling_policies
329 .as_ref()
330 .map(SchedulingDecision::reasons_to_audit_log_reasons),
331 )
332 }
333}
334
335pub struct TransactionResult {
336 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
337 pub catalog_updates: Vec<ParsedStateUpdate>,
339 pub audit_events: Vec<VersionedEvent>,
340}
341
342#[derive(Debug, Clone, Copy)]
343enum TransactInnerMode {
344 Commit,
348 DryRun,
355}
356
357impl Catalog {
358 fn should_audit_log_item(item: &CatalogItem) -> bool {
359 !item.is_temporary()
360 }
361
362 fn temporary_ids(
365 &self,
366 ops: &[Op],
367 temporary_drops: BTreeSet<(&ConnectionId, String)>,
368 ) -> Result<BTreeSet<CatalogItemId>, Error> {
369 let mut creating = BTreeSet::new();
370 let mut temporary_ids = BTreeSet::new();
371 for op in ops.iter() {
372 if let Op::CreateItem {
373 id,
374 name,
375 item,
376 owner_id: _,
377 } = op
378 {
379 if let Some(conn_id) = item.conn_id() {
380 if self.item_exists_in_temp_schemas(conn_id, &name.item)
381 && !temporary_drops.contains(&(conn_id, name.item.clone()))
382 || creating.contains(&(conn_id, &name.item))
383 {
384 return Err(
385 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
386 );
387 } else {
388 creating.insert((conn_id, &name.item));
389 temporary_ids.insert(id.clone());
390 }
391 }
392 }
393 }
394 Ok(temporary_ids)
395 }
396
397 #[instrument(name = "catalog::transact")]
398 pub async fn transact(
399 &mut self,
400 storage_collections: Option<
403 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
404 >,
405 oracle_write_ts: mz_repr::Timestamp,
406 session: Option<&ConnMeta>,
407 ops: Vec<Op>,
408 ) -> Result<TransactionResult, AdapterError> {
409 trace!("transact: {:?}", ops);
410 fail::fail_point!("catalog_transact", |arg| {
411 Err(AdapterError::Unstructured(anyhow::anyhow!(
412 "failpoint: {arg:?}"
413 )))
414 });
415
416 let drop_ids: BTreeSet<CatalogItemId> = ops
417 .iter()
418 .filter_map(|op| match op {
419 Op::DropObjects(drop_object_infos) => {
420 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
421 let item_ids = ids.filter_map(|id| match id {
422 ObjectId::Item(id) => Some(id),
423 _ => None,
424 });
425 Some(item_ids)
426 }
427 _ => None,
428 })
429 .flatten()
430 .collect();
431 let temporary_drops = drop_ids
432 .iter()
433 .filter_map(|id| {
434 let entry = self.get_entry(id);
435 match entry.item().conn_id() {
436 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
437 None => None,
438 }
439 })
440 .collect();
441 let dropped_global_ids = drop_ids
442 .iter()
443 .flat_map(|item_id| self.get_global_ids(item_id))
444 .collect();
445
446 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
447 let mut builtin_table_updates = vec![];
448 let mut catalog_updates = vec![];
449 let mut audit_events = vec![];
450 let mut storage = self.storage().await;
451 let mut tx = storage
452 .transaction()
453 .await
454 .unwrap_or_terminate("starting catalog transaction");
455
456 let new_state = Self::transact_inner(
457 TransactInnerMode::Commit,
458 storage_collections,
459 oracle_write_ts,
460 session,
461 ops,
462 temporary_ids,
463 &mut builtin_table_updates,
464 &mut catalog_updates,
465 &mut audit_events,
466 &mut tx,
467 &self.state,
468 )
469 .await?;
470
471 tx.commit(oracle_write_ts)
476 .await
477 .unwrap_or_terminate("catalog storage transaction commit must succeed");
478
479 drop(storage);
482 if let Some(new_state) = new_state {
483 self.transient_revision += 1;
484 self.state = new_state;
485 }
486
487 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
489 if self.state.system_config().enable_mz_notices() {
490 self.state().pack_optimizer_notices(
492 &mut builtin_table_updates,
493 dropped_notices.iter(),
494 Diff::MINUS_ONE,
495 );
496 }
497
498 Ok(TransactionResult {
499 builtin_table_updates,
500 catalog_updates,
501 audit_events,
502 })
503 }
504
505 pub async fn transact_incremental_dry_run(
520 &self,
521 base_state: &CatalogState,
522 ops: Vec<Op>,
523 session: Option<&ConnMeta>,
524 prev_snapshot: Option<Snapshot>,
525 oracle_write_ts: mz_repr::Timestamp,
526 ) -> Result<(CatalogState, Snapshot), AdapterError> {
527 let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
530
531 let mut builtin_table_updates = vec![];
532 let mut catalog_updates = vec![];
533 let mut audit_events = vec![];
534 let mut storage = self.storage().await;
535 let mut tx = if let Some(snapshot) = prev_snapshot {
536 storage
539 .transaction_from_snapshot(snapshot)
540 .unwrap_or_terminate("starting catalog transaction from snapshot")
541 } else {
542 storage
545 .transaction()
546 .await
547 .unwrap_or_terminate("starting catalog transaction")
548 };
549
550 let new_state = Self::transact_inner(
552 TransactInnerMode::DryRun,
553 None,
554 oracle_write_ts,
555 session,
556 ops,
557 temporary_ids,
558 &mut builtin_table_updates,
559 &mut catalog_updates,
560 &mut audit_events,
561 &mut tx,
562 base_state,
563 )
564 .await?;
565
566 let new_snapshot = tx.current_snapshot();
569
570 drop(storage);
572
573 let state = new_state.unwrap_or_else(|| base_state.clone());
575 Ok((state, new_snapshot))
576 }
577
578 fn extract_expressions_from_ops(
582 ops: &[Op],
583 optimizer_features: &OptimizerFeatures,
584 ) -> BTreeMap<GlobalId, LocalExpressions> {
585 let mut exprs = BTreeMap::new();
586
587 for op in ops {
588 if let Op::CreateItem { item, .. } = op {
589 match item {
590 CatalogItem::View(view) => {
591 exprs.insert(
592 view.global_id,
593 LocalExpressions {
594 local_mir: (*view.optimized_expr).clone(),
595 optimizer_features: optimizer_features.clone(),
596 },
597 );
598 }
599 CatalogItem::MaterializedView(mv) => {
600 exprs.insert(
601 mv.global_id_writes(),
602 LocalExpressions {
603 local_mir: (*mv.optimized_expr).clone(),
604 optimizer_features: optimizer_features.clone(),
605 },
606 );
607 }
608 _ => {}
609 }
610 }
611 }
612
613 exprs
614 }
615
616 #[instrument(name = "catalog::transact_inner")]
624 async fn transact_inner(
625 mode: TransactInnerMode,
626 storage_collections: Option<
627 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
628 >,
629 oracle_write_ts: mz_repr::Timestamp,
630 session: Option<&ConnMeta>,
631 ops: Vec<Op>,
632 temporary_ids: BTreeSet<CatalogItemId>,
633 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
634 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
635 audit_events: &mut Vec<VersionedEvent>,
636 tx: &mut Transaction<'_>,
637 state: &CatalogState,
638 ) -> Result<Option<CatalogState>, AdapterError> {
639 let mut preliminary_state = Cow::Borrowed(state);
666
667 let mut state = Cow::Borrowed(state);
669
670 if ops.is_empty() {
671 return Ok(None);
672 }
673
674 let optimizer_features = OptimizerFeatures::from(state.system_config());
677 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
678
679 let mut storage_collections_to_create = BTreeSet::new();
680 let mut storage_collections_to_drop = BTreeSet::new();
681 let mut storage_collections_to_register = BTreeMap::new();
682
683 let mut updates = Vec::new();
684
685 for op in ops {
686 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
687 oracle_write_ts,
688 session,
689 op,
690 &temporary_ids,
691 audit_events,
692 tx,
693 &*preliminary_state,
694 &mut storage_collections_to_create,
695 &mut storage_collections_to_drop,
696 &mut storage_collections_to_register,
697 )
698 .await?;
699
700 if let Some(builtin_table_update) = weird_builtin_table_update {
709 builtin_table_updates.push(builtin_table_update);
710 }
711
712 let upper = tx.upper();
717 let temporary_item_updates =
718 temporary_item_updates
719 .into_iter()
720 .map(|(item, diff)| StateUpdate {
721 kind: StateUpdateKind::TemporaryItem(item),
722 ts: upper,
723 diff,
724 });
725
726 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
727 op_updates.extend(temporary_item_updates);
728 if !op_updates.is_empty() {
729 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
732 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
733 .to_mut()
734 .apply_updates(op_updates.clone(), &mut local_expr_cache)
735 .await;
736 }
737 updates.append(&mut op_updates);
738 }
739
740 if !updates.is_empty() {
741 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
742 let (op_builtin_table_updates, op_catalog_updates) = state
743 .to_mut()
744 .apply_updates(updates.clone(), &mut local_expr_cache)
745 .await;
746 let op_builtin_table_updates = state
747 .to_mut()
748 .resolve_builtin_table_updates(op_builtin_table_updates);
749 builtin_table_updates.extend(op_builtin_table_updates);
750 parsed_catalog_updates.extend(op_catalog_updates);
751 }
752
753 match mode {
754 TransactInnerMode::Commit => {
755 if let Some(c) = storage_collections {
757 c.prepare_state(
758 tx,
759 storage_collections_to_create,
760 storage_collections_to_drop,
761 storage_collections_to_register,
762 )
763 .await?;
764 }
765 }
766 TransactInnerMode::DryRun => {
767 debug_assert!(
768 storage_collections.is_none(),
769 "dry-run mode must not prepare storage state"
770 );
771 }
772 }
773
774 let updates = tx.get_and_commit_op_updates();
775 if !updates.is_empty() {
776 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
777 let (op_builtin_table_updates, op_catalog_updates) = state
778 .to_mut()
779 .apply_updates(updates.clone(), &mut local_expr_cache)
780 .await;
781 let op_builtin_table_updates = state
782 .to_mut()
783 .resolve_builtin_table_updates(op_builtin_table_updates);
784 builtin_table_updates.extend(op_builtin_table_updates);
785 parsed_catalog_updates.extend(op_catalog_updates);
786 }
787
788 match state {
789 Cow::Owned(state) => Ok(Some(state)),
790 Cow::Borrowed(_) => Ok(None),
791 }
792 }
793
794 #[instrument]
802 async fn transact_op(
803 oracle_write_ts: mz_repr::Timestamp,
804 session: Option<&ConnMeta>,
805 op: Op,
806 temporary_ids: &BTreeSet<CatalogItemId>,
807 audit_events: &mut Vec<VersionedEvent>,
808 tx: &mut Transaction<'_>,
809 state: &CatalogState,
810 storage_collections_to_create: &mut BTreeSet<GlobalId>,
811 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
812 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
813 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
814 let mut weird_builtin_table_update = None;
815 let mut temporary_item_updates = Vec::new();
816
817 match op {
818 Op::AlterRetainHistory { id, value, window } => {
819 let entry = state.get_entry(&id);
820 if id.is_system() {
821 let name = entry.name();
822 let full_name =
823 state.resolve_full_name(name, session.map(|session| session.conn_id()));
824 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
825 full_name.to_string(),
826 ))));
827 }
828
829 let mut new_entry = entry.clone();
830 let previous = new_entry
831 .item
832 .update_retain_history(value.clone(), window)
833 .map_err(|_| {
834 AdapterError::Catalog(Error::new(ErrorKind::Internal(
835 "planner should have rejected invalid alter retain history item type"
836 .to_string(),
837 )))
838 })?;
839
840 if Self::should_audit_log_item(new_entry.item()) {
841 let details =
842 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
843 id: id.to_string(),
844 old_history: previous.map(|previous| previous.to_string()),
845 new_history: value.map(|v| v.to_string()),
846 });
847 CatalogState::add_to_audit_log(
848 &state.system_configuration,
849 oracle_write_ts,
850 session,
851 tx,
852 audit_events,
853 EventType::Alter,
854 catalog_type_to_audit_object_type(new_entry.item().typ()),
855 details,
856 )?;
857 }
858
859 tx.update_item(id, new_entry.into())?;
860
861 Self::log_update(state, &id);
862 }
863 Op::AlterSourceTimestampInterval {
864 id,
865 value,
866 interval,
867 } => {
868 let entry = state.get_entry(&id);
869 if id.is_system() {
870 let name = entry.name();
871 let full_name =
872 state.resolve_full_name(name, session.map(|session| session.conn_id()));
873 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
874 full_name.to_string(),
875 ))));
876 }
877
878 let mut new_entry = entry.clone();
879 new_entry
880 .item
881 .update_timestamp_interval(value, interval)
882 .map_err(|_| {
883 AdapterError::Catalog(Error::new(ErrorKind::Internal(
884 "planner should have rejected invalid alter timestamp interval item type"
885 .to_string(),
886 )))
887 })?;
888
889 tx.update_item(id, new_entry.into())?;
890
891 Self::log_update(state, &id);
892 }
893 Op::AlterRole {
894 id,
895 name,
896 attributes,
897 nopassword,
898 vars,
899 } => {
900 state.ensure_not_reserved_role(&id)?;
901
902 let mut existing_role = state.get_role(&id).clone();
903 let password = attributes.password.clone();
904 let scram_iterations = attributes
905 .scram_iterations
906 .unwrap_or_else(|| state.system_config().scram_iterations());
907 existing_role.attributes = attributes.into();
908 existing_role.vars = vars;
909 let password_action = if nopassword {
910 PasswordAction::Clear
911 } else if let Some(password) = password {
912 PasswordAction::Set(PasswordConfig {
913 password,
914 scram_iterations,
915 })
916 } else {
917 PasswordAction::NoChange
918 };
919 tx.update_role(id, existing_role.into(), password_action)?;
920
921 CatalogState::add_to_audit_log(
922 &state.system_configuration,
923 oracle_write_ts,
924 session,
925 tx,
926 audit_events,
927 EventType::Alter,
928 ObjectType::Role,
929 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
930 id: id.to_string(),
931 name: name.clone(),
932 }),
933 )?;
934
935 info!("update role {name} ({id})");
936 }
937 Op::AlterNetworkPolicy {
938 id,
939 rules,
940 name,
941 owner_id: _owner_id,
942 } => {
943 let existing_policy = state.get_network_policy(&id).clone();
944 let mut policy: NetworkPolicy = existing_policy.into();
945 policy.rules = rules;
946 if is_reserved_name(&name) {
947 return Err(AdapterError::Catalog(Error::new(
948 ErrorKind::ReservedNetworkPolicyName(name),
949 )));
950 }
951 tx.update_network_policy(id, policy.clone())?;
952
953 CatalogState::add_to_audit_log(
954 &state.system_configuration,
955 oracle_write_ts,
956 session,
957 tx,
958 audit_events,
959 EventType::Alter,
960 ObjectType::NetworkPolicy,
961 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
962 id: id.to_string(),
963 name: name.clone(),
964 }),
965 )?;
966
967 info!("update network policy {name} ({id})");
968 }
969 Op::AlterAddColumn {
970 id,
971 new_global_id,
972 name,
973 typ,
974 sql,
975 } => {
976 let mut new_entry = state.get_entry(&id).clone();
977 let version = new_entry.item.add_column(name, typ, sql)?;
978 let shard_id = state
981 .storage_metadata()
982 .get_collection_shard(new_entry.latest_global_id())?;
983
984 let CatalogItem::Table(table) = &mut new_entry.item else {
986 return Err(AdapterError::Unsupported("adding columns to non-Table"));
987 };
988 table.collections.insert(version, new_global_id);
989
990 tx.update_item(id, new_entry.into())?;
991 storage_collections_to_register.insert(new_global_id, shard_id);
992 }
993 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
994 let mut new_entry = state.get_entry(&id).clone();
995 let replacement = state.get_entry(&replacement_id);
996
997 let new_audit_events =
998 apply_replacement_audit_events(state, &new_entry, replacement);
999
1000 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1001 return Err(AdapterError::internal(
1002 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1003 "id must refer to a materialized view",
1004 ));
1005 };
1006 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1007 return Err(AdapterError::internal(
1008 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1009 "replacement_id must refer to a materialized view",
1010 ));
1011 };
1012
1013 mv.apply_replacement(replacement_mv.clone());
1014
1015 tx.remove_item(replacement_id)?;
1016
1017 new_entry.id = replacement_id;
1018 tx_replace_item(tx, state, id, new_entry)?;
1019
1020 let comment_id = CommentObjectId::MaterializedView(replacement_id);
1021 tx.drop_comments(&[comment_id].into())?;
1022
1023 for (event_type, details) in new_audit_events {
1024 CatalogState::add_to_audit_log(
1025 &state.system_configuration,
1026 oracle_write_ts,
1027 session,
1028 tx,
1029 audit_events,
1030 event_type,
1031 ObjectType::MaterializedView,
1032 details,
1033 )?;
1034 }
1035 }
1036 Op::CreateDatabase { name, owner_id } => {
1037 let database_owner_privileges = vec![rbac::owner_privilege(
1038 mz_sql::catalog::ObjectType::Database,
1039 owner_id,
1040 )];
1041 let database_default_privileges = state
1042 .default_privileges
1043 .get_applicable_privileges(
1044 owner_id,
1045 None,
1046 None,
1047 mz_sql::catalog::ObjectType::Database,
1048 )
1049 .map(|item| item.mz_acl_item(owner_id));
1050 let database_privileges: Vec<_> = merge_mz_acl_items(
1051 database_owner_privileges
1052 .into_iter()
1053 .chain(database_default_privileges),
1054 )
1055 .collect();
1056
1057 let schema_owner_privileges = vec![rbac::owner_privilege(
1058 mz_sql::catalog::ObjectType::Schema,
1059 owner_id,
1060 )];
1061 let schema_default_privileges = state
1062 .default_privileges
1063 .get_applicable_privileges(
1064 owner_id,
1065 None,
1066 None,
1067 mz_sql::catalog::ObjectType::Schema,
1068 )
1069 .map(|item| item.mz_acl_item(owner_id))
1070 .chain(std::iter::once(MzAclItem {
1072 grantee: RoleId::Public,
1073 grantor: owner_id,
1074 acl_mode: AclMode::USAGE,
1075 }));
1076 let schema_privileges: Vec<_> = merge_mz_acl_items(
1077 schema_owner_privileges
1078 .into_iter()
1079 .chain(schema_default_privileges),
1080 )
1081 .collect();
1082
1083 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1084 let (database_id, _) = tx.insert_user_database(
1085 &name,
1086 owner_id,
1087 database_privileges.clone(),
1088 &temporary_oids,
1089 )?;
1090 let (schema_id, _) = tx.insert_user_schema(
1091 database_id,
1092 DEFAULT_SCHEMA,
1093 owner_id,
1094 schema_privileges.clone(),
1095 &temporary_oids,
1096 )?;
1097 CatalogState::add_to_audit_log(
1098 &state.system_configuration,
1099 oracle_write_ts,
1100 session,
1101 tx,
1102 audit_events,
1103 EventType::Create,
1104 ObjectType::Database,
1105 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1106 id: database_id.to_string(),
1107 name: name.clone(),
1108 }),
1109 )?;
1110 info!("create database {}", name);
1111
1112 CatalogState::add_to_audit_log(
1113 &state.system_configuration,
1114 oracle_write_ts,
1115 session,
1116 tx,
1117 audit_events,
1118 EventType::Create,
1119 ObjectType::Schema,
1120 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1121 id: schema_id.to_string(),
1122 name: DEFAULT_SCHEMA.to_string(),
1123 database_name: Some(name),
1124 }),
1125 )?;
1126 }
1127 Op::CreateSchema {
1128 database_id,
1129 schema_name,
1130 owner_id,
1131 } => {
1132 if is_reserved_name(&schema_name) {
1133 return Err(AdapterError::Catalog(Error::new(
1134 ErrorKind::ReservedSchemaName(schema_name),
1135 )));
1136 }
1137 let database_id = match database_id {
1138 ResolvedDatabaseSpecifier::Id(id) => id,
1139 ResolvedDatabaseSpecifier::Ambient => {
1140 return Err(AdapterError::Catalog(Error::new(
1141 ErrorKind::ReadOnlySystemSchema(schema_name),
1142 )));
1143 }
1144 };
1145 let owner_privileges = vec![rbac::owner_privilege(
1146 mz_sql::catalog::ObjectType::Schema,
1147 owner_id,
1148 )];
1149 let default_privileges = state
1150 .default_privileges
1151 .get_applicable_privileges(
1152 owner_id,
1153 Some(database_id),
1154 None,
1155 mz_sql::catalog::ObjectType::Schema,
1156 )
1157 .map(|item| item.mz_acl_item(owner_id));
1158 let privileges: Vec<_> =
1159 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1160 .collect();
1161 let (schema_id, _) = tx.insert_user_schema(
1162 database_id,
1163 &schema_name,
1164 owner_id,
1165 privileges.clone(),
1166 &state.get_temporary_oids().collect(),
1167 )?;
1168 CatalogState::add_to_audit_log(
1169 &state.system_configuration,
1170 oracle_write_ts,
1171 session,
1172 tx,
1173 audit_events,
1174 EventType::Create,
1175 ObjectType::Schema,
1176 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1177 id: schema_id.to_string(),
1178 name: schema_name.clone(),
1179 database_name: Some(state.database_by_id[&database_id].name.clone()),
1180 }),
1181 )?;
1182 }
1183 Op::CreateRole { name, attributes } => {
1184 if is_reserved_role_name(&name) {
1185 return Err(AdapterError::Catalog(Error::new(
1186 ErrorKind::ReservedRoleName(name),
1187 )));
1188 }
1189 let membership = RoleMembership::new();
1190 let vars = RoleVars::default();
1191 let (id, _) = tx.insert_user_role(
1192 name.clone(),
1193 attributes.clone(),
1194 membership.clone(),
1195 vars.clone(),
1196 &state.get_temporary_oids().collect(),
1197 )?;
1198 CatalogState::add_to_audit_log(
1199 &state.system_configuration,
1200 oracle_write_ts,
1201 session,
1202 tx,
1203 audit_events,
1204 EventType::Create,
1205 ObjectType::Role,
1206 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1207 id: id.to_string(),
1208 name: name.clone(),
1209 }),
1210 )?;
1211 info!("create role {}", name);
1212 }
1213 Op::CreateCluster {
1214 id,
1215 name,
1216 introspection_sources,
1217 owner_id,
1218 config,
1219 } => {
1220 if is_reserved_name(&name) {
1221 return Err(AdapterError::Catalog(Error::new(
1222 ErrorKind::ReservedClusterName(name),
1223 )));
1224 }
1225 let owner_privileges = vec![rbac::owner_privilege(
1226 mz_sql::catalog::ObjectType::Cluster,
1227 owner_id,
1228 )];
1229 let default_privileges = state
1230 .default_privileges
1231 .get_applicable_privileges(
1232 owner_id,
1233 None,
1234 None,
1235 mz_sql::catalog::ObjectType::Cluster,
1236 )
1237 .map(|item| item.mz_acl_item(owner_id));
1238 let privileges: Vec<_> =
1239 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1240 .collect();
1241 let introspection_source_ids: Vec<_> = introspection_sources
1242 .iter()
1243 .map(|introspection_source| {
1244 Transaction::allocate_introspection_source_index_id(
1245 &id,
1246 introspection_source.variant,
1247 )
1248 })
1249 .collect();
1250
1251 let introspection_sources = introspection_sources
1252 .into_iter()
1253 .zip_eq(introspection_source_ids)
1254 .map(|(log, (item_id, gid))| (log, item_id, gid))
1255 .collect();
1256
1257 tx.insert_user_cluster(
1258 id,
1259 &name,
1260 introspection_sources,
1261 owner_id,
1262 privileges.clone(),
1263 config.clone().into(),
1264 &state.get_temporary_oids().collect(),
1265 )?;
1266 CatalogState::add_to_audit_log(
1267 &state.system_configuration,
1268 oracle_write_ts,
1269 session,
1270 tx,
1271 audit_events,
1272 EventType::Create,
1273 ObjectType::Cluster,
1274 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1275 id: id.to_string(),
1276 name: name.clone(),
1277 }),
1278 )?;
1279 info!("create cluster {}", name);
1280 }
1281 Op::CreateClusterReplica {
1282 cluster_id,
1283 name,
1284 config,
1285 owner_id,
1286 reason,
1287 } => {
1288 if is_reserved_name(&name) {
1289 return Err(AdapterError::Catalog(Error::new(
1290 ErrorKind::ReservedReplicaName(name),
1291 )));
1292 }
1293 let cluster = state.get_cluster(cluster_id);
1294 let id =
1295 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1296 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1297 size,
1298 billed_as,
1299 internal,
1300 ..
1301 }) = &config.location
1302 {
1303 let (reason, scheduling_policies) = reason.into_audit_log();
1304 let details = EventDetails::CreateClusterReplicaV4(
1305 mz_audit_log::CreateClusterReplicaV4 {
1306 cluster_id: cluster_id.to_string(),
1307 cluster_name: cluster.name.clone(),
1308 replica_id: Some(id.to_string()),
1309 replica_name: name.clone(),
1310 logical_size: size.clone(),
1311 billed_as: billed_as.clone(),
1312 internal: *internal,
1313 reason,
1314 scheduling_policies,
1315 },
1316 );
1317 CatalogState::add_to_audit_log(
1318 &state.system_configuration,
1319 oracle_write_ts,
1320 session,
1321 tx,
1322 audit_events,
1323 EventType::Create,
1324 ObjectType::ClusterReplica,
1325 details,
1326 )?;
1327 }
1328 }
1329 Op::CreateItem {
1330 id,
1331 name,
1332 item,
1333 owner_id,
1334 } => {
1335 state.check_unstable_dependencies(&item)?;
1336
1337 match &item {
1338 CatalogItem::Table(table) => {
1339 let gids: Vec<_> = table.global_ids().collect();
1340 assert_eq!(gids.len(), 1);
1341 storage_collections_to_create.extend(gids);
1342 }
1343 CatalogItem::Source(source) => {
1344 storage_collections_to_create.insert(source.global_id());
1345 }
1346 CatalogItem::MaterializedView(mv) => {
1347 let mv_gid = mv.global_id_writes();
1348 if let Some(target_id) = mv.replacement_target {
1349 let target_gid = state.get_entry(&target_id).latest_global_id();
1350 let shard_id =
1351 state.storage_metadata().get_collection_shard(target_gid)?;
1352 storage_collections_to_register.insert(mv_gid, shard_id);
1353 } else {
1354 storage_collections_to_create.insert(mv_gid);
1355 }
1356 }
1357 CatalogItem::ContinualTask(ct) => {
1358 storage_collections_to_create.insert(ct.global_id());
1359 }
1360 CatalogItem::Sink(sink) => {
1361 storage_collections_to_create.insert(sink.global_id());
1362 }
1363 CatalogItem::Log(_)
1364 | CatalogItem::View(_)
1365 | CatalogItem::Index(_)
1366 | CatalogItem::Type(_)
1367 | CatalogItem::Func(_)
1368 | CatalogItem::Secret(_)
1369 | CatalogItem::Connection(_) => (),
1370 }
1371
1372 let system_user = session.map_or(false, |s| s.user().is_system_user());
1373 if !system_user {
1374 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1375 let cluster_name = state.clusters_by_id[&id].name.clone();
1376 return Err(AdapterError::Catalog(Error::new(
1377 ErrorKind::ReadOnlyCluster(cluster_name),
1378 )));
1379 }
1380 }
1381
1382 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1383 let default_privileges = state
1384 .default_privileges
1385 .get_applicable_privileges(
1386 owner_id,
1387 name.qualifiers.database_spec.id(),
1388 Some(name.qualifiers.schema_spec.into()),
1389 item.typ().into(),
1390 )
1391 .map(|item| item.mz_acl_item(owner_id));
1392 let progress_source_privilege = if item.is_progress_source() {
1394 Some(MzAclItem {
1395 grantee: MZ_SUPPORT_ROLE_ID,
1396 grantor: owner_id,
1397 acl_mode: AclMode::SELECT,
1398 })
1399 } else {
1400 None
1401 };
1402 let privileges: Vec<_> = merge_mz_acl_items(
1403 owner_privileges
1404 .into_iter()
1405 .chain(default_privileges)
1406 .chain(progress_source_privilege),
1407 )
1408 .collect();
1409
1410 let temporary_oids = state.get_temporary_oids().collect();
1411
1412 if item.is_temporary() {
1413 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1414 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1415 {
1416 return Err(AdapterError::Catalog(Error::new(
1417 ErrorKind::InvalidTemporarySchema,
1418 )));
1419 }
1420 let oid = tx.allocate_oid(&temporary_oids)?;
1421
1422 let schema_id = name.qualifiers.schema_spec.clone().into();
1423 let item_type = item.typ();
1424 let (create_sql, global_id, versions) = item.to_serialized();
1425
1426 let item = TemporaryItem {
1427 id,
1428 oid,
1429 global_id,
1430 schema_id,
1431 name: name.item.clone(),
1432 create_sql,
1433 conn_id: item.conn_id().cloned(),
1434 owner_id,
1435 privileges: privileges.clone(),
1436 extra_versions: versions,
1437 };
1438 temporary_item_updates.push((item, StateDiff::Addition));
1439
1440 info!(
1441 "create temporary {} {} ({})",
1442 item_type,
1443 state.resolve_full_name(&name, None),
1444 id
1445 );
1446 } else {
1447 if let Some(temp_id) =
1448 item.uses()
1449 .iter()
1450 .find(|id| match state.try_get_entry(*id) {
1451 Some(entry) => entry.item().is_temporary(),
1452 None => temporary_ids.contains(id),
1453 })
1454 {
1455 let temp_item = state.get_entry(temp_id);
1456 return Err(AdapterError::Catalog(Error::new(
1457 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1458 )));
1459 }
1460 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1461 && !system_user
1462 {
1463 let schema_name = state
1464 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1465 .schema;
1466 return Err(AdapterError::Catalog(Error::new(
1467 ErrorKind::ReadOnlySystemSchema(schema_name),
1468 )));
1469 }
1470 let schema_id = name.qualifiers.schema_spec.clone().into();
1471 let item_type = item.typ();
1472 let (create_sql, global_id, versions) = item.to_serialized();
1473 tx.insert_user_item(
1474 id,
1475 global_id,
1476 schema_id,
1477 &name.item,
1478 create_sql,
1479 owner_id,
1480 privileges.clone(),
1481 &temporary_oids,
1482 versions,
1483 )?;
1484 info!(
1485 "create {} {} ({})",
1486 item_type,
1487 state.resolve_full_name(&name, None),
1488 id
1489 );
1490 }
1491
1492 if Self::should_audit_log_item(&item) {
1493 let name = Self::full_name_detail(
1494 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1495 );
1496 let details = match &item {
1497 CatalogItem::Source(s) => {
1498 let cluster_id = match s.data_source {
1499 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1502 match state.get_entry(&ingestion_id).cluster_id() {
1503 Some(cluster_id) => Some(cluster_id.to_string()),
1504 None => None,
1505 }
1506 }
1507 _ => match item.cluster_id() {
1508 Some(cluster_id) => Some(cluster_id.to_string()),
1509 None => None,
1510 },
1511 };
1512
1513 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1514 id: id.to_string(),
1515 cluster_id,
1516 name,
1517 external_type: s.source_type().to_string(),
1518 })
1519 }
1520 CatalogItem::Sink(s) => {
1521 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1522 id: id.to_string(),
1523 cluster_id: Some(s.cluster_id.to_string()),
1524 name,
1525 external_type: s.sink_type().to_string(),
1526 })
1527 }
1528 CatalogItem::Index(i) => {
1529 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1530 id: id.to_string(),
1531 name,
1532 cluster_id: i.cluster_id.to_string(),
1533 })
1534 }
1535 CatalogItem::MaterializedView(mv) => {
1536 EventDetails::CreateMaterializedViewV1(
1537 mz_audit_log::CreateMaterializedViewV1 {
1538 id: id.to_string(),
1539 name,
1540 cluster_id: mv.cluster_id.to_string(),
1541 replacement_target_id: mv
1542 .replacement_target
1543 .map(|id| id.to_string()),
1544 },
1545 )
1546 }
1547 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1548 id: id.to_string(),
1549 name,
1550 }),
1551 };
1552 CatalogState::add_to_audit_log(
1553 &state.system_configuration,
1554 oracle_write_ts,
1555 session,
1556 tx,
1557 audit_events,
1558 EventType::Create,
1559 catalog_type_to_audit_object_type(item.typ()),
1560 details,
1561 )?;
1562 }
1563 }
1564 Op::CreateNetworkPolicy {
1565 rules,
1566 name,
1567 owner_id,
1568 } => {
1569 if state.network_policies_by_name.contains_key(&name) {
1570 return Err(AdapterError::PlanError(PlanError::Catalog(
1571 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1572 )));
1573 }
1574 if is_reserved_name(&name) {
1575 return Err(AdapterError::Catalog(Error::new(
1576 ErrorKind::ReservedNetworkPolicyName(name),
1577 )));
1578 }
1579
1580 let owner_privileges = vec![rbac::owner_privilege(
1581 mz_sql::catalog::ObjectType::NetworkPolicy,
1582 owner_id,
1583 )];
1584 let default_privileges = state
1585 .default_privileges
1586 .get_applicable_privileges(
1587 owner_id,
1588 None,
1589 None,
1590 mz_sql::catalog::ObjectType::NetworkPolicy,
1591 )
1592 .map(|item| item.mz_acl_item(owner_id));
1593 let privileges: Vec<_> =
1594 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1595 .collect();
1596
1597 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1598 let id = tx.insert_user_network_policy(
1599 name.clone(),
1600 rules,
1601 privileges,
1602 owner_id,
1603 &temporary_oids,
1604 )?;
1605
1606 CatalogState::add_to_audit_log(
1607 &state.system_configuration,
1608 oracle_write_ts,
1609 session,
1610 tx,
1611 audit_events,
1612 EventType::Create,
1613 ObjectType::NetworkPolicy,
1614 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1615 id: id.to_string(),
1616 name: name.clone(),
1617 }),
1618 )?;
1619
1620 info!("created network policy {name} ({id})");
1621 }
1622 Op::Comment {
1623 object_id,
1624 sub_component,
1625 comment,
1626 } => {
1627 tx.update_comment(object_id, sub_component, comment)?;
1628 let entry = state.get_comment_id_entry(&object_id);
1629 let should_log = entry
1630 .map(|entry| Self::should_audit_log_item(entry.item()))
1631 .unwrap_or(true);
1633 if let (Some(conn_id), true) =
1636 (session.map(|session| session.conn_id()), should_log)
1637 {
1638 CatalogState::add_to_audit_log(
1639 &state.system_configuration,
1640 oracle_write_ts,
1641 session,
1642 tx,
1643 audit_events,
1644 EventType::Comment,
1645 comment_id_to_audit_object_type(object_id),
1646 EventDetails::IdNameV1(IdNameV1 {
1647 id: format!("{object_id:?}"),
1649 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1650 }),
1651 )?;
1652 }
1653 }
1654 Op::UpdateSourceReferences {
1655 source_id,
1656 references,
1657 } => {
1658 tx.update_source_references(
1659 source_id,
1660 references
1661 .references
1662 .into_iter()
1663 .map(|reference| reference.into())
1664 .collect(),
1665 references.updated_at,
1666 )?;
1667 }
1668 Op::DropObjects(drop_object_infos) => {
1669 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1671
1672 tx.drop_comments(&delta.comments)?;
1674
1675 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1677 delta
1678 .items
1679 .iter()
1680 .map(|id| id)
1681 .partition(|id| !state.get_entry(*id).item().is_temporary());
1682 tx.remove_items(&durable_items_to_drop)?;
1683 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1684 let entry = state.get_entry(&id);
1685 (entry.clone().into(), StateDiff::Retraction)
1686 }));
1687
1688 for item_id in delta.items {
1689 let entry = state.get_entry(&item_id);
1690
1691 if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1695 {
1696 storage_collections_to_drop.extend(entry.global_ids());
1697 }
1698
1699 if state.source_references.contains_key(&item_id) {
1700 tx.remove_source_references(item_id)?;
1701 }
1702
1703 if Self::should_audit_log_item(entry.item()) {
1704 CatalogState::add_to_audit_log(
1705 &state.system_configuration,
1706 oracle_write_ts,
1707 session,
1708 tx,
1709 audit_events,
1710 EventType::Drop,
1711 catalog_type_to_audit_object_type(entry.item().typ()),
1712 EventDetails::IdFullNameV1(IdFullNameV1 {
1713 id: item_id.to_string(),
1714 name: Self::full_name_detail(&state.resolve_full_name(
1715 entry.name(),
1716 session.map(|session| session.conn_id()),
1717 )),
1718 }),
1719 )?;
1720 }
1721 info!(
1722 "drop {} {} ({})",
1723 entry.item_type(),
1724 state.resolve_full_name(entry.name(), entry.conn_id()),
1725 item_id
1726 );
1727 }
1728
1729 let schemas = delta
1731 .schemas
1732 .iter()
1733 .map(|(schema_spec, database_spec)| {
1734 (SchemaId::from(schema_spec), *database_spec)
1735 })
1736 .collect();
1737 tx.remove_schemas(&schemas)?;
1738
1739 for (schema_spec, database_spec) in delta.schemas {
1740 let schema = state.get_schema(
1741 &database_spec,
1742 &schema_spec,
1743 session
1744 .map(|session| session.conn_id())
1745 .unwrap_or(&SYSTEM_CONN_ID),
1746 );
1747
1748 let schema_id = SchemaId::from(schema_spec);
1749 let database_id = match database_spec {
1750 ResolvedDatabaseSpecifier::Ambient => None,
1751 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1752 };
1753
1754 CatalogState::add_to_audit_log(
1755 &state.system_configuration,
1756 oracle_write_ts,
1757 session,
1758 tx,
1759 audit_events,
1760 EventType::Drop,
1761 ObjectType::Schema,
1762 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1763 id: schema_id.to_string(),
1764 name: schema.name.schema.to_string(),
1765 database_name: database_id
1766 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1767 }),
1768 )?;
1769 }
1770
1771 tx.remove_databases(&delta.databases)?;
1773
1774 for database_id in delta.databases {
1775 let database = state.get_database(&database_id).clone();
1776
1777 CatalogState::add_to_audit_log(
1778 &state.system_configuration,
1779 oracle_write_ts,
1780 session,
1781 tx,
1782 audit_events,
1783 EventType::Drop,
1784 ObjectType::Database,
1785 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1786 id: database_id.to_string(),
1787 name: database.name.clone(),
1788 }),
1789 )?;
1790 }
1791
1792 tx.remove_user_roles(&delta.roles)?;
1794
1795 for role_id in delta.roles {
1796 let role = state
1797 .roles_by_id
1798 .get(&role_id)
1799 .expect("catalog out of sync");
1800
1801 CatalogState::add_to_audit_log(
1802 &state.system_configuration,
1803 oracle_write_ts,
1804 session,
1805 tx,
1806 audit_events,
1807 EventType::Drop,
1808 ObjectType::Role,
1809 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1810 id: role.id.to_string(),
1811 name: role.name.clone(),
1812 }),
1813 )?;
1814 info!("drop role {}", role.name());
1815 }
1816
1817 tx.remove_network_policies(&delta.network_policies)?;
1819
1820 for network_policy_id in delta.network_policies {
1821 let policy = state
1822 .network_policies_by_id
1823 .get(&network_policy_id)
1824 .expect("catalog out of sync");
1825
1826 CatalogState::add_to_audit_log(
1827 &state.system_configuration,
1828 oracle_write_ts,
1829 session,
1830 tx,
1831 audit_events,
1832 EventType::Drop,
1833 ObjectType::NetworkPolicy,
1834 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1835 id: policy.id.to_string(),
1836 name: policy.name.clone(),
1837 }),
1838 )?;
1839 info!("drop network policy {}", policy.name.clone());
1840 }
1841
1842 let replicas = delta.replicas.keys().copied().collect();
1844 tx.remove_cluster_replicas(&replicas)?;
1845
1846 for (replica_id, (cluster_id, reason)) in delta.replicas {
1847 let cluster = state.get_cluster(cluster_id);
1848 let replica = cluster.replica(replica_id).expect("Must exist");
1849
1850 let (reason, scheduling_policies) = reason.into_audit_log();
1851 let details =
1852 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1853 cluster_id: cluster_id.to_string(),
1854 cluster_name: cluster.name.clone(),
1855 replica_id: Some(replica_id.to_string()),
1856 replica_name: replica.name.clone(),
1857 reason,
1858 scheduling_policies,
1859 });
1860 CatalogState::add_to_audit_log(
1861 &state.system_configuration,
1862 oracle_write_ts,
1863 session,
1864 tx,
1865 audit_events,
1866 EventType::Drop,
1867 ObjectType::ClusterReplica,
1868 details,
1869 )?;
1870 }
1871
1872 tx.remove_clusters(&delta.clusters)?;
1874
1875 for cluster_id in delta.clusters {
1876 let cluster = state.get_cluster(cluster_id);
1877
1878 CatalogState::add_to_audit_log(
1879 &state.system_configuration,
1880 oracle_write_ts,
1881 session,
1882 tx,
1883 audit_events,
1884 EventType::Drop,
1885 ObjectType::Cluster,
1886 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1887 id: cluster.id.to_string(),
1888 name: cluster.name.clone(),
1889 }),
1890 )?;
1891 }
1892 }
1893 Op::GrantRole {
1894 role_id,
1895 member_id,
1896 grantor_id,
1897 } => {
1898 state.ensure_not_reserved_role(&member_id)?;
1899 state.ensure_grantable_role(&role_id)?;
1900 if state.collect_role_membership(&role_id).contains(&member_id) {
1901 let group_role = state.get_role(&role_id);
1902 let member_role = state.get_role(&member_id);
1903 return Err(AdapterError::Catalog(Error::new(
1904 ErrorKind::CircularRoleMembership {
1905 role_name: group_role.name().to_string(),
1906 member_name: member_role.name().to_string(),
1907 },
1908 )));
1909 }
1910 let mut member_role = state.get_role(&member_id).clone();
1911 member_role.membership.map.insert(role_id, grantor_id);
1912 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1913
1914 CatalogState::add_to_audit_log(
1915 &state.system_configuration,
1916 oracle_write_ts,
1917 session,
1918 tx,
1919 audit_events,
1920 EventType::Grant,
1921 ObjectType::Role,
1922 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1923 role_id: role_id.to_string(),
1924 member_id: member_id.to_string(),
1925 grantor_id: grantor_id.to_string(),
1926 executed_by: session
1927 .map(|session| session.authenticated_role_id())
1928 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1929 .to_string(),
1930 }),
1931 )?;
1932 }
1933 Op::RevokeRole {
1934 role_id,
1935 member_id,
1936 grantor_id,
1937 } => {
1938 state.ensure_not_reserved_role(&member_id)?;
1939 state.ensure_grantable_role(&role_id)?;
1940 let mut member_role = state.get_role(&member_id).clone();
1941 member_role.membership.map.remove(&role_id);
1942 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1943
1944 CatalogState::add_to_audit_log(
1945 &state.system_configuration,
1946 oracle_write_ts,
1947 session,
1948 tx,
1949 audit_events,
1950 EventType::Revoke,
1951 ObjectType::Role,
1952 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1953 role_id: role_id.to_string(),
1954 member_id: member_id.to_string(),
1955 grantor_id: grantor_id.to_string(),
1956 executed_by: session
1957 .map(|session| session.authenticated_role_id())
1958 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1959 .to_string(),
1960 }),
1961 )?;
1962 }
1963 Op::UpdatePrivilege {
1964 target_id,
1965 privilege,
1966 variant,
1967 } => {
1968 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1969 UpdatePrivilegeVariant::Grant => {
1970 privileges.grant(privilege);
1971 }
1972 UpdatePrivilegeVariant::Revoke => {
1973 privileges.revoke(&privilege);
1974 }
1975 };
1976 match &target_id {
1977 SystemObjectId::Object(object_id) => match object_id {
1978 ObjectId::Cluster(id) => {
1979 let mut cluster = state.get_cluster(*id).clone();
1980 update_privilege_fn(&mut cluster.privileges);
1981 tx.update_cluster(*id, cluster.into())?;
1982 }
1983 ObjectId::Database(id) => {
1984 let mut database = state.get_database(id).clone();
1985 update_privilege_fn(&mut database.privileges);
1986 tx.update_database(*id, database.into())?;
1987 }
1988 ObjectId::NetworkPolicy(id) => {
1989 let mut policy = state.get_network_policy(id).clone();
1990 update_privilege_fn(&mut policy.privileges);
1991 tx.update_network_policy(*id, policy.into())?;
1992 }
1993 ObjectId::Schema((database_spec, schema_spec)) => {
1994 let schema_id = schema_spec.clone().into();
1995 let mut schema = state
1996 .get_schema(
1997 database_spec,
1998 schema_spec,
1999 session
2000 .map(|session| session.conn_id())
2001 .unwrap_or(&SYSTEM_CONN_ID),
2002 )
2003 .clone();
2004 update_privilege_fn(&mut schema.privileges);
2005 tx.update_schema(schema_id, schema.into())?;
2006 }
2007 ObjectId::Item(id) => {
2008 let entry = state.get_entry(id);
2009 let mut new_entry = entry.clone();
2010 update_privilege_fn(&mut new_entry.privileges);
2011 if !new_entry.item().is_temporary() {
2012 tx.update_item(*id, new_entry.into())?;
2013 } else {
2014 temporary_item_updates
2015 .push((entry.clone().into(), StateDiff::Retraction));
2016 temporary_item_updates
2017 .push((new_entry.into(), StateDiff::Addition));
2018 }
2019 }
2020 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2021 },
2022 SystemObjectId::System => {
2023 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2024 update_privilege_fn(&mut system_privileges);
2025 let new_privilege =
2026 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2027 tx.set_system_privilege(
2028 privilege.grantee,
2029 privilege.grantor,
2030 new_privilege.map(|new_privilege| new_privilege.acl_mode),
2031 )?;
2032 }
2033 }
2034 let object_type = state.get_system_object_type(&target_id);
2035 let object_id_str = match &target_id {
2036 SystemObjectId::System => "SYSTEM".to_string(),
2037 SystemObjectId::Object(id) => id.to_string(),
2038 };
2039 CatalogState::add_to_audit_log(
2040 &state.system_configuration,
2041 oracle_write_ts,
2042 session,
2043 tx,
2044 audit_events,
2045 variant.into(),
2046 system_object_type_to_audit_object_type(&object_type),
2047 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2048 object_id: object_id_str,
2049 grantee_id: privilege.grantee.to_string(),
2050 grantor_id: privilege.grantor.to_string(),
2051 privileges: privilege.acl_mode.to_string(),
2052 }),
2053 )?;
2054 }
2055 Op::UpdateDefaultPrivilege {
2056 privilege_object,
2057 privilege_acl_item,
2058 variant,
2059 } => {
2060 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2061 match variant {
2062 UpdatePrivilegeVariant::Grant => default_privileges
2063 .grant(privilege_object.clone(), privilege_acl_item.clone()),
2064 UpdatePrivilegeVariant::Revoke => {
2065 default_privileges.revoke(&privilege_object, &privilege_acl_item)
2066 }
2067 }
2068 let new_acl_mode = default_privileges
2069 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2070 tx.set_default_privilege(
2071 privilege_object.role_id,
2072 privilege_object.database_id,
2073 privilege_object.schema_id,
2074 privilege_object.object_type,
2075 privilege_acl_item.grantee,
2076 new_acl_mode.cloned(),
2077 )?;
2078 CatalogState::add_to_audit_log(
2079 &state.system_configuration,
2080 oracle_write_ts,
2081 session,
2082 tx,
2083 audit_events,
2084 variant.into(),
2085 object_type_to_audit_object_type(privilege_object.object_type),
2086 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2087 role_id: privilege_object.role_id.to_string(),
2088 database_id: privilege_object.database_id.map(|id| id.to_string()),
2089 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2090 grantee_id: privilege_acl_item.grantee.to_string(),
2091 privileges: privilege_acl_item.acl_mode.to_string(),
2092 }),
2093 )?;
2094 }
2095 Op::RenameCluster {
2096 id,
2097 name,
2098 to_name,
2099 check_reserved_names,
2100 } => {
2101 if id.is_system() {
2102 return Err(AdapterError::Catalog(Error::new(
2103 ErrorKind::ReadOnlyCluster(name.clone()),
2104 )));
2105 }
2106 if check_reserved_names && is_reserved_name(&to_name) {
2107 return Err(AdapterError::Catalog(Error::new(
2108 ErrorKind::ReservedClusterName(to_name),
2109 )));
2110 }
2111 tx.rename_cluster(id, &name, &to_name)?;
2112 CatalogState::add_to_audit_log(
2113 &state.system_configuration,
2114 oracle_write_ts,
2115 session,
2116 tx,
2117 audit_events,
2118 EventType::Alter,
2119 ObjectType::Cluster,
2120 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2121 id: id.to_string(),
2122 old_name: name.clone(),
2123 new_name: to_name.clone(),
2124 }),
2125 )?;
2126 info!("rename cluster {name} to {to_name}");
2127 }
2128 Op::RenameClusterReplica {
2129 cluster_id,
2130 replica_id,
2131 name,
2132 to_name,
2133 } => {
2134 if is_reserved_name(&to_name) {
2135 return Err(AdapterError::Catalog(Error::new(
2136 ErrorKind::ReservedReplicaName(to_name),
2137 )));
2138 }
2139 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2140 CatalogState::add_to_audit_log(
2141 &state.system_configuration,
2142 oracle_write_ts,
2143 session,
2144 tx,
2145 audit_events,
2146 EventType::Alter,
2147 ObjectType::ClusterReplica,
2148 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2149 cluster_id: cluster_id.to_string(),
2150 replica_id: replica_id.to_string(),
2151 old_name: name.replica.as_str().to_string(),
2152 new_name: to_name.clone(),
2153 }),
2154 )?;
2155 info!("rename cluster replica {name} to {to_name}");
2156 }
2157 Op::RenameItem {
2158 id,
2159 to_name,
2160 current_full_name,
2161 } => {
2162 let mut updates = Vec::new();
2163
2164 let entry = state.get_entry(&id);
2165 if let CatalogItem::Type(_) = entry.item() {
2166 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2167 current_full_name.to_string(),
2168 ))));
2169 }
2170
2171 if entry.id().is_system() {
2172 let name = state
2173 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2174 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2175 name.to_string(),
2176 ))));
2177 }
2178
2179 let mut to_full_name = current_full_name.clone();
2180 to_full_name.item.clone_from(&to_name);
2181
2182 let mut to_qualified_name = entry.name().clone();
2183 to_qualified_name.item.clone_from(&to_name);
2184
2185 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2186 id: id.to_string(),
2187 old_name: Self::full_name_detail(¤t_full_name),
2188 new_name: Self::full_name_detail(&to_full_name),
2189 });
2190 if Self::should_audit_log_item(entry.item()) {
2191 CatalogState::add_to_audit_log(
2192 &state.system_configuration,
2193 oracle_write_ts,
2194 session,
2195 tx,
2196 audit_events,
2197 EventType::Alter,
2198 catalog_type_to_audit_object_type(entry.item().typ()),
2199 details,
2200 )?;
2201 }
2202
2203 let mut new_entry = entry.clone();
2205 new_entry.name.item.clone_from(&to_name);
2206 new_entry.item = entry
2207 .item()
2208 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2209 .map_err(|e| {
2210 Error::new(ErrorKind::from(AmbiguousRename {
2211 depender: state
2212 .resolve_full_name(entry.name(), entry.conn_id())
2213 .to_string(),
2214 dependee: state
2215 .resolve_full_name(entry.name(), entry.conn_id())
2216 .to_string(),
2217 message: e,
2218 }))
2219 })?;
2220
2221 for id in entry.referenced_by() {
2222 let dependent_item = state.get_entry(id);
2223 let mut to_entry = dependent_item.clone();
2224 to_entry.item = dependent_item
2225 .item()
2226 .rename_item_refs(
2227 current_full_name.clone(),
2228 to_full_name.item.clone(),
2229 false,
2230 )
2231 .map_err(|e| {
2232 Error::new(ErrorKind::from(AmbiguousRename {
2233 depender: state
2234 .resolve_full_name(
2235 dependent_item.name(),
2236 dependent_item.conn_id(),
2237 )
2238 .to_string(),
2239 dependee: state
2240 .resolve_full_name(entry.name(), entry.conn_id())
2241 .to_string(),
2242 message: e,
2243 }))
2244 })?;
2245
2246 if !to_entry.item().is_temporary() {
2247 tx.update_item(*id, to_entry.into())?;
2248 } else {
2249 temporary_item_updates
2250 .push((dependent_item.clone().into(), StateDiff::Retraction));
2251 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2252 }
2253 updates.push(*id);
2254 }
2255 if !new_entry.item().is_temporary() {
2256 tx.update_item(id, new_entry.into())?;
2257 } else {
2258 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2259 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2260 }
2261
2262 updates.push(id);
2263 for id in updates {
2264 Self::log_update(state, &id);
2265 }
2266 }
2267 Op::RenameSchema {
2268 database_spec,
2269 schema_spec,
2270 new_name,
2271 check_reserved_names,
2272 } => {
2273 if check_reserved_names && is_reserved_name(&new_name) {
2274 return Err(AdapterError::Catalog(Error::new(
2275 ErrorKind::ReservedSchemaName(new_name),
2276 )));
2277 }
2278
2279 let conn_id = session
2280 .map(|session| session.conn_id())
2281 .unwrap_or(&SYSTEM_CONN_ID);
2282
2283 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2284 let cur_name = schema.name().schema.clone();
2285
2286 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2287 return Err(AdapterError::Catalog(Error::new(
2288 ErrorKind::AmbientSchemaRename(cur_name),
2289 )));
2290 };
2291 let database = state.get_database(&database_id);
2292 let database_name = &database.name;
2293
2294 let mut updates = Vec::new();
2295 let mut items_to_update = BTreeMap::new();
2296
2297 let mut update_item = |id| {
2298 if items_to_update.contains_key(id) {
2299 return Ok(());
2300 }
2301
2302 let entry = state.get_entry(id);
2303
2304 let mut new_entry = entry.clone();
2306 new_entry.item = entry
2307 .item
2308 .rename_schema_refs(database_name, &cur_name, &new_name)
2309 .map_err(|(s, _i)| {
2310 Error::new(ErrorKind::from(AmbiguousRename {
2311 depender: state
2312 .resolve_full_name(entry.name(), entry.conn_id())
2313 .to_string(),
2314 dependee: format!("{database_name}.{cur_name}"),
2315 message: format!("ambiguous reference to schema named {s}"),
2316 }))
2317 })?;
2318
2319 if !new_entry.item().is_temporary() {
2321 items_to_update.insert(*id, new_entry.into());
2322 } else {
2323 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2324 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2325 }
2326 updates.push(id);
2327
2328 Ok::<_, AdapterError>(())
2329 };
2330
2331 for (_name, item_id) in &schema.items {
2333 update_item(item_id)?;
2335
2336 for id in state.get_entry(item_id).referenced_by() {
2338 update_item(id)?;
2339 }
2340 }
2341 tx.update_items(items_to_update)?;
2344
2345 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2347 let schema_name = schema.name().schema.clone();
2348 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2349 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2350 )));
2351 };
2352
2353 let database_name = database_spec
2355 .id()
2356 .map(|id| state.get_database(&id).name.clone());
2357 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2358 id: schema_id.to_string(),
2359 old_name: schema.name().schema.clone(),
2360 new_name: new_name.clone(),
2361 database_name,
2362 });
2363 CatalogState::add_to_audit_log(
2364 &state.system_configuration,
2365 oracle_write_ts,
2366 session,
2367 tx,
2368 audit_events,
2369 EventType::Alter,
2370 mz_audit_log::ObjectType::Schema,
2371 details,
2372 )?;
2373
2374 let mut new_schema = schema.clone();
2376 new_schema.name.schema.clone_from(&new_name);
2377 tx.update_schema(schema_id, new_schema.into())?;
2378
2379 for id in updates {
2380 Self::log_update(state, id);
2381 }
2382 }
2383 Op::UpdateOwner { id, new_owner } => {
2384 let conn_id = session
2385 .map(|session| session.conn_id())
2386 .unwrap_or(&SYSTEM_CONN_ID);
2387 let old_owner = state
2388 .get_owner_id(&id, conn_id)
2389 .expect("cannot update the owner of an object without an owner");
2390 match &id {
2391 ObjectId::Cluster(id) => {
2392 let mut cluster = state.get_cluster(*id).clone();
2393 if id.is_system() {
2394 return Err(AdapterError::Catalog(Error::new(
2395 ErrorKind::ReadOnlyCluster(cluster.name),
2396 )));
2397 }
2398 Self::update_privilege_owners(
2399 &mut cluster.privileges,
2400 cluster.owner_id,
2401 new_owner,
2402 );
2403 cluster.owner_id = new_owner;
2404 tx.update_cluster(*id, cluster.into())?;
2405 }
2406 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2407 let cluster = state.get_cluster(*cluster_id);
2408 let mut replica = cluster
2409 .replica(*replica_id)
2410 .expect("catalog out of sync")
2411 .clone();
2412 if replica_id.is_system() {
2413 return Err(AdapterError::Catalog(Error::new(
2414 ErrorKind::ReadOnlyClusterReplica(replica.name),
2415 )));
2416 }
2417 replica.owner_id = new_owner;
2418 tx.update_cluster_replica(*replica_id, replica.into())?;
2419 }
2420 ObjectId::Database(id) => {
2421 let mut database = state.get_database(id).clone();
2422 if id.is_system() {
2423 return Err(AdapterError::Catalog(Error::new(
2424 ErrorKind::ReadOnlyDatabase(database.name),
2425 )));
2426 }
2427 Self::update_privilege_owners(
2428 &mut database.privileges,
2429 database.owner_id,
2430 new_owner,
2431 );
2432 database.owner_id = new_owner;
2433 tx.update_database(*id, database.clone().into())?;
2434 }
2435 ObjectId::Schema((database_spec, schema_spec)) => {
2436 let schema_id: SchemaId = schema_spec.clone().into();
2437 let mut schema = state
2438 .get_schema(database_spec, schema_spec, conn_id)
2439 .clone();
2440 if schema_id.is_system() {
2441 let name = schema.name();
2442 let full_name = state.resolve_full_schema_name(name);
2443 return Err(AdapterError::Catalog(Error::new(
2444 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2445 )));
2446 }
2447 Self::update_privilege_owners(
2448 &mut schema.privileges,
2449 schema.owner_id,
2450 new_owner,
2451 );
2452 schema.owner_id = new_owner;
2453 tx.update_schema(schema_id, schema.into())?;
2454 }
2455 ObjectId::Item(id) => {
2456 let entry = state.get_entry(id);
2457 let mut new_entry = entry.clone();
2458 if id.is_system() {
2459 let full_name = state.resolve_full_name(
2460 new_entry.name(),
2461 session.map(|session| session.conn_id()),
2462 );
2463 return Err(AdapterError::Catalog(Error::new(
2464 ErrorKind::ReadOnlyItem(full_name.to_string()),
2465 )));
2466 }
2467 Self::update_privilege_owners(
2468 &mut new_entry.privileges,
2469 new_entry.owner_id,
2470 new_owner,
2471 );
2472 new_entry.owner_id = new_owner;
2473 if !new_entry.item().is_temporary() {
2474 tx.update_item(*id, new_entry.into())?;
2475 } else {
2476 temporary_item_updates
2477 .push((entry.clone().into(), StateDiff::Retraction));
2478 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2479 }
2480 }
2481 ObjectId::NetworkPolicy(id) => {
2482 let mut policy = state.get_network_policy(id).clone();
2483 if id.is_system() {
2484 return Err(AdapterError::Catalog(Error::new(
2485 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2486 )));
2487 }
2488 Self::update_privilege_owners(
2489 &mut policy.privileges,
2490 policy.owner_id,
2491 new_owner,
2492 );
2493 policy.owner_id = new_owner;
2494 tx.update_network_policy(*id, policy.into())?;
2495 }
2496 ObjectId::Role(_) => unreachable!("roles have no owner"),
2497 }
2498 let object_type = state.get_object_type(&id);
2499 CatalogState::add_to_audit_log(
2500 &state.system_configuration,
2501 oracle_write_ts,
2502 session,
2503 tx,
2504 audit_events,
2505 EventType::Alter,
2506 object_type_to_audit_object_type(object_type),
2507 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2508 object_id: id.to_string(),
2509 old_owner_id: old_owner.to_string(),
2510 new_owner_id: new_owner.to_string(),
2511 }),
2512 )?;
2513 }
2514 Op::UpdateClusterConfig { id, name, config } => {
2515 let mut cluster = state.get_cluster(id).clone();
2516 cluster.config = config;
2517 tx.update_cluster(id, cluster.into())?;
2518 info!("update cluster {}", name);
2519
2520 CatalogState::add_to_audit_log(
2521 &state.system_configuration,
2522 oracle_write_ts,
2523 session,
2524 tx,
2525 audit_events,
2526 EventType::Alter,
2527 ObjectType::Cluster,
2528 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2529 id: id.to_string(),
2530 name,
2531 }),
2532 )?;
2533 }
2534 Op::UpdateClusterReplicaConfig {
2535 replica_id,
2536 cluster_id,
2537 config,
2538 } => {
2539 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2540 info!("update replica {}", replica.name);
2541 tx.update_cluster_replica(
2542 replica_id,
2543 mz_catalog::durable::ClusterReplica {
2544 cluster_id,
2545 replica_id,
2546 name: replica.name.clone(),
2547 config: config.clone().into(),
2548 owner_id: replica.owner_id,
2549 },
2550 )?;
2551 }
2552 Op::UpdateItem { id, name, to_item } => {
2553 let mut entry = state.get_entry(&id).clone();
2554 entry.name = name.clone();
2555 entry.item = to_item.clone();
2556 tx.update_item(id, entry.into())?;
2557
2558 if Self::should_audit_log_item(&to_item) {
2559 let mut full_name = Self::full_name_detail(
2560 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2561 );
2562 full_name.item = name.item;
2563
2564 CatalogState::add_to_audit_log(
2565 &state.system_configuration,
2566 oracle_write_ts,
2567 session,
2568 tx,
2569 audit_events,
2570 EventType::Alter,
2571 catalog_type_to_audit_object_type(to_item.typ()),
2572 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2573 id: id.to_string(),
2574 name: full_name,
2575 }),
2576 )?;
2577 }
2578
2579 Self::log_update(state, &id);
2580 }
2581 Op::UpdateSystemConfiguration { name, value } => {
2582 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2583 tx.upsert_system_config(&name, parsed_value.clone())?;
2584 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2589 let with_0dt_deployment_max_wait =
2590 Duration::parse(VarInput::Flat(&parsed_value))
2591 .expect("parsing succeeded above");
2592 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2593 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2594 let with_0dt_deployment_ddl_check_interval =
2595 Duration::parse(VarInput::Flat(&parsed_value))
2596 .expect("parsing succeeded above");
2597 tx.set_0dt_deployment_ddl_check_interval(
2598 with_0dt_deployment_ddl_check_interval,
2599 )?;
2600 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2601 let panic_after_timeout =
2602 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2603 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2604 }
2605
2606 CatalogState::add_to_audit_log(
2607 &state.system_configuration,
2608 oracle_write_ts,
2609 session,
2610 tx,
2611 audit_events,
2612 EventType::Alter,
2613 ObjectType::System,
2614 EventDetails::SetV1(mz_audit_log::SetV1 {
2615 name,
2616 value: Some(value.borrow().to_vec().join(", ")),
2617 }),
2618 )?;
2619 }
2620 Op::ResetSystemConfiguration { name } => {
2621 tx.remove_system_config(&name);
2622 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2627 tx.reset_0dt_deployment_max_wait()?;
2628 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2629 tx.reset_0dt_deployment_ddl_check_interval()?;
2630 }
2631
2632 CatalogState::add_to_audit_log(
2633 &state.system_configuration,
2634 oracle_write_ts,
2635 session,
2636 tx,
2637 audit_events,
2638 EventType::Alter,
2639 ObjectType::System,
2640 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2641 )?;
2642 }
2643 Op::ResetAllSystemConfiguration => {
2644 tx.clear_system_configs();
2645 tx.reset_0dt_deployment_max_wait()?;
2646 tx.reset_0dt_deployment_ddl_check_interval()?;
2647
2648 CatalogState::add_to_audit_log(
2649 &state.system_configuration,
2650 oracle_write_ts,
2651 session,
2652 tx,
2653 audit_events,
2654 EventType::Alter,
2655 ObjectType::System,
2656 EventDetails::ResetAllV1,
2657 )?;
2658 }
2659 Op::WeirdStorageUsageUpdates {
2660 object_id,
2661 size_bytes,
2662 collection_timestamp,
2663 } => {
2664 let id = tx.allocate_storage_usage_ids()?;
2665 let metric =
2666 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2667 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2668 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2669 weird_builtin_table_update = Some(builtin_table_update);
2670 }
2671 };
2672 Ok((weird_builtin_table_update, temporary_item_updates))
2673 }
2674
2675 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2676 let entry = state.get_entry(id);
2677 info!(
2678 "update {} {} ({})",
2679 entry.item_type(),
2680 state.resolve_full_name(entry.name(), entry.conn_id()),
2681 id
2682 );
2683 }
2684
2685 fn update_privilege_owners(
2689 privileges: &mut PrivilegeMap,
2690 old_owner: RoleId,
2691 new_owner: RoleId,
2692 ) {
2693 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2695
2696 let mut new_present = false;
2697 for privilege in flat_privileges.iter_mut() {
2698 if privilege.grantor == old_owner {
2701 privilege.grantor = new_owner;
2702 } else if privilege.grantor == new_owner {
2703 new_present = true;
2704 }
2705 if privilege.grantee == old_owner {
2707 privilege.grantee = new_owner;
2708 } else if privilege.grantee == new_owner {
2709 new_present = true;
2710 }
2711 }
2712
2713 if new_present {
2717 let privilege_map: BTreeMap<_, Vec<_>> =
2719 flat_privileges
2720 .into_iter()
2721 .fold(BTreeMap::new(), |mut accum, privilege| {
2722 accum
2723 .entry((privilege.grantee, privilege.grantor))
2724 .or_default()
2725 .push(privilege);
2726 accum
2727 });
2728
2729 flat_privileges = privilege_map
2731 .into_iter()
2732 .map(|((grantee, grantor), values)|
2733 values.into_iter().fold(
2735 MzAclItem::empty(grantee, grantor),
2736 |mut accum, mz_aclitem| {
2737 accum.acl_mode =
2738 accum.acl_mode.union(mz_aclitem.acl_mode);
2739 accum
2740 },
2741 ))
2742 .collect();
2743 }
2744
2745 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2746 }
2747}
2748
2749fn tx_replace_item(
2758 tx: &mut Transaction<'_>,
2759 state: &CatalogState,
2760 id: CatalogItemId,
2761 new_entry: CatalogEntry,
2762) -> Result<(), AdapterError> {
2763 let new_id = new_entry.id;
2764
2765 for use_id in new_entry.referenced_by() {
2767 if tx.get_item(use_id).is_none() {
2769 continue;
2770 }
2771
2772 let mut dependent = state.get_entry(use_id).clone();
2773 dependent.item = dependent.item.replace_item_refs(id, new_id);
2774 tx.update_item(*use_id, dependent.into())?;
2775 }
2776
2777 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2779 let new_comment_id = new_entry.comment_object_id();
2780 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2781 tx.drop_comments(&[old_comment_id].into())?;
2782 for (sub, comment) in comments {
2783 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2784 }
2785 }
2786
2787 let mz_catalog::durable::Item {
2788 id: _,
2789 oid,
2790 global_id,
2791 schema_id,
2792 name,
2793 create_sql,
2794 owner_id,
2795 privileges,
2796 extra_versions,
2797 } = new_entry.into();
2798
2799 tx.remove_item(id)?;
2800 tx.insert_item(
2801 new_id,
2802 oid,
2803 global_id,
2804 schema_id,
2805 &name,
2806 create_sql,
2807 owner_id,
2808 privileges,
2809 extra_versions,
2810 )?;
2811
2812 Ok(())
2813}
2814
2815fn apply_replacement_audit_events(
2817 state: &CatalogState,
2818 target: &CatalogEntry,
2819 replacement: &CatalogEntry,
2820) -> Vec<(EventType, EventDetails)> {
2821 let mut events = Vec::new();
2822
2823 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2824 let target_id_name = IdFullNameV1 {
2825 id: target.id().to_string(),
2826 name: Catalog::full_name_detail(&target_name),
2827 };
2828 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2829 let replacement_id_name = IdFullNameV1 {
2830 id: replacement.id().to_string(),
2831 name: Catalog::full_name_detail(&replacement_name),
2832 };
2833
2834 if Catalog::should_audit_log_item(&replacement.item) {
2835 events.push((
2836 EventType::Drop,
2837 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2838 ));
2839 }
2840
2841 if Catalog::should_audit_log_item(&target.item) {
2842 events.push((
2843 EventType::Alter,
2844 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2845 target: target_id_name.clone(),
2846 replacement: replacement_id_name,
2847 }),
2848 ));
2849
2850 if let Some(old_cluster_id) = target.cluster_id()
2851 && let Some(new_cluster_id) = replacement.cluster_id()
2852 && old_cluster_id != new_cluster_id
2853 {
2854 events.push((
2857 EventType::Alter,
2858 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2859 id: replacement.id().to_string(),
2860 name: target_id_name.name,
2861 old_cluster_id: old_cluster_id.to_string(),
2862 new_cluster_id: new_cluster_id.to_string(),
2863 }),
2864 ));
2865 }
2866 }
2867
2868 events
2869}
2870
2871#[derive(Debug, Default)]
2879pub(crate) struct ObjectsToDrop {
2880 pub comments: BTreeSet<CommentObjectId>,
2881 pub databases: BTreeSet<DatabaseId>,
2882 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2883 pub clusters: BTreeSet<ClusterId>,
2884 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2885 pub roles: BTreeSet<RoleId>,
2886 pub items: Vec<CatalogItemId>,
2887 pub network_policies: BTreeSet<NetworkPolicyId>,
2888}
2889
2890impl ObjectsToDrop {
2891 pub fn generate(
2892 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2893 state: &CatalogState,
2894 session: Option<&ConnMeta>,
2895 ) -> Result<Self, AdapterError> {
2896 let mut delta = ObjectsToDrop::default();
2897
2898 for drop_object_info in drop_object_infos {
2899 delta.add_item(drop_object_info, state, session)?;
2900 }
2901
2902 Ok(delta)
2903 }
2904
2905 fn add_item(
2906 &mut self,
2907 drop_object_info: DropObjectInfo,
2908 state: &CatalogState,
2909 session: Option<&ConnMeta>,
2910 ) -> Result<(), AdapterError> {
2911 self.comments
2912 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2913
2914 match drop_object_info {
2915 DropObjectInfo::Database(database_id) => {
2916 let database = &state.database_by_id[&database_id];
2917 if database_id.is_system() {
2918 return Err(AdapterError::Catalog(Error::new(
2919 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2920 )));
2921 }
2922
2923 self.databases.insert(database_id);
2924 }
2925 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2926 let schema = state.get_schema(
2927 &database_spec,
2928 &schema_spec,
2929 session
2930 .map(|session| session.conn_id())
2931 .unwrap_or(&SYSTEM_CONN_ID),
2932 );
2933 let schema_id: SchemaId = schema_spec.into();
2934 if schema_id.is_system() {
2935 let name = schema.name();
2936 let full_name = state.resolve_full_schema_name(name);
2937 return Err(AdapterError::Catalog(Error::new(
2938 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2939 )));
2940 }
2941
2942 self.schemas.insert(schema_spec, database_spec);
2943 }
2944 DropObjectInfo::Role(role_id) => {
2945 let name = state.get_role(&role_id).name().to_string();
2946 if role_id.is_system() || role_id.is_predefined() {
2947 return Err(AdapterError::Catalog(Error::new(
2948 ErrorKind::ReservedRoleName(name.clone()),
2949 )));
2950 }
2951 state.ensure_not_reserved_role(&role_id)?;
2952
2953 self.roles.insert(role_id);
2954 }
2955 DropObjectInfo::Cluster(cluster_id) => {
2956 let cluster = state.get_cluster(cluster_id);
2957 let name = &cluster.name;
2958 if cluster_id.is_system() {
2959 return Err(AdapterError::Catalog(Error::new(
2960 ErrorKind::ReadOnlyCluster(name.clone()),
2961 )));
2962 }
2963
2964 self.clusters.insert(cluster_id);
2965 }
2966 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2967 let cluster = state.get_cluster(cluster_id);
2968 let replica = cluster.replica(replica_id).expect("Must exist");
2969
2970 self.replicas
2971 .insert(replica.replica_id, (cluster.id, reason));
2972
2973 for (_id, entry) in state.get_entries() {
2977 if let CatalogItem::MaterializedView(mv) = entry.item() {
2978 if mv.target_replica == Some(replica_id)
2979 && !self.items.contains(&entry.id())
2980 {
2981 tracing::warn!(
2982 "implicitly dropping materialized view {} because target replica was dropped",
2983 entry.name().item,
2984 );
2985 self.items.push(entry.id());
2986 }
2987 }
2988 }
2989 }
2990 DropObjectInfo::Item(item_id) => {
2991 let entry = state.get_entry(&item_id);
2992 if item_id.is_system() {
2993 let name = entry.name();
2994 let full_name =
2995 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2996 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2997 full_name.to_string(),
2998 ))));
2999 }
3000
3001 self.items.push(item_id);
3002 }
3003 DropObjectInfo::NetworkPolicy(network_policy_id) => {
3004 let policy = state.get_network_policy(&network_policy_id);
3005 let name = &policy.name;
3006 if network_policy_id.is_system() {
3007 return Err(AdapterError::Catalog(Error::new(
3008 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3009 )));
3010 }
3011
3012 self.network_policies.insert(network_policy_id);
3013 }
3014 }
3015
3016 Ok(())
3017 }
3018}
3019
3020#[cfg(test)]
3021mod tests {
3022 use mz_catalog::SYSTEM_CONN_ID;
3023 use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3024 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3025 use mz_repr::role_id::RoleId;
3026 use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3027 use mz_sql::DEFAULT_SCHEMA;
3028 use mz_sql::catalog::CatalogDatabase;
3029 use mz_sql::names::{
3030 ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3031 };
3032 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3033
3034 use crate::catalog::{Catalog, Op};
3035 use crate::session::DEFAULT_DATABASE_NAME;
3036
3037 #[mz_ore::test]
3038 fn test_update_privilege_owners() {
3039 let old_owner = RoleId::User(1);
3040 let new_owner = RoleId::User(2);
3041 let other_role = RoleId::User(3);
3042
3043 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3045 MzAclItem {
3046 grantee: other_role,
3047 grantor: old_owner,
3048 acl_mode: AclMode::UPDATE,
3049 },
3050 MzAclItem {
3051 grantee: other_role,
3052 grantor: new_owner,
3053 acl_mode: AclMode::SELECT,
3054 },
3055 ]);
3056 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3057 assert_eq!(1, privileges.all_values().count());
3058 assert_eq!(
3059 vec![MzAclItem {
3060 grantee: other_role,
3061 grantor: new_owner,
3062 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3063 }],
3064 privileges.all_values_owned().collect::<Vec<_>>()
3065 );
3066
3067 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3069 MzAclItem {
3070 grantee: old_owner,
3071 grantor: other_role,
3072 acl_mode: AclMode::UPDATE,
3073 },
3074 MzAclItem {
3075 grantee: new_owner,
3076 grantor: other_role,
3077 acl_mode: AclMode::SELECT,
3078 },
3079 ]);
3080 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3081 assert_eq!(1, privileges.all_values().count());
3082 assert_eq!(
3083 vec![MzAclItem {
3084 grantee: new_owner,
3085 grantor: other_role,
3086 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3087 }],
3088 privileges.all_values_owned().collect::<Vec<_>>()
3089 );
3090
3091 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3093 MzAclItem {
3094 grantee: old_owner,
3095 grantor: old_owner,
3096 acl_mode: AclMode::UPDATE,
3097 },
3098 MzAclItem {
3099 grantee: new_owner,
3100 grantor: new_owner,
3101 acl_mode: AclMode::SELECT,
3102 },
3103 ]);
3104 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3105 assert_eq!(1, privileges.all_values().count());
3106 assert_eq!(
3107 vec![MzAclItem {
3108 grantee: new_owner,
3109 grantor: new_owner,
3110 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3111 }],
3112 privileges.all_values_owned().collect::<Vec<_>>()
3113 );
3114 }
3115
3116 #[mz_ore::test(tokio::test)]
3123 #[cfg_attr(miri, ignore)] async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3125 Catalog::with_debug(|catalog| async move {
3126 let database = catalog
3128 .resolve_database(DEFAULT_DATABASE_NAME)
3129 .expect("default database");
3130 let database_name = database.name.clone();
3131 let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3132 let schema = catalog
3133 .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3134 .expect("default schema");
3135 let schema_name = schema.name.schema.clone();
3136 let schema_spec = schema.id.clone();
3137
3138 let (id_t1, global_id_t1) = catalog
3140 .allocate_user_id_for_test()
3141 .await
3142 .expect("allocate id for t1");
3143 let (id_t2, global_id_t2) = catalog
3144 .allocate_user_id_for_test()
3145 .await
3146 .expect("allocate id for t2");
3147
3148 let oracle_write_ts = catalog.current_upper().await;
3149
3150 let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3152 id,
3153 name: QualifiedItemName {
3154 qualifiers: ItemQualifiers {
3155 database_spec: database_spec.clone(),
3156 schema_spec: schema_spec.clone(),
3157 },
3158 item: name.to_string(),
3159 },
3160 item: CatalogItem::Table(Table {
3161 create_sql: Some(format!(
3162 "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3163 )),
3164 desc: VersionedRelationDesc::new(RelationDesc::empty()),
3165 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3166 conn_id: None,
3167 resolved_ids: ResolvedIds::empty(),
3168 custom_logical_compaction_window: None,
3169 is_retained_metrics_object: false,
3170 data_source: TableDataSource::TableWrites { defaults: vec![] },
3171 }),
3172 owner_id: MZ_SYSTEM_ROLE_ID,
3173 };
3174
3175 let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3176 let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3177
3178 let base_state = catalog.state().clone();
3179
3180 let (state_after_t1, snapshot_after_t1) = catalog
3184 .transact_incremental_dry_run(
3185 &base_state,
3186 vec![op_t1.clone()],
3187 None,
3188 None,
3189 oracle_write_ts,
3190 )
3191 .await
3192 .expect("first dry run");
3193
3194 assert!(
3196 state_after_t1.try_get_entry(&id_t1).is_some(),
3197 "t1 should exist after first dry run"
3198 );
3199 assert_eq!(
3200 state_after_t1
3201 .try_get_entry(&id_t1)
3202 .expect("t1 entry")
3203 .name()
3204 .item,
3205 "t1"
3206 );
3207 assert!(
3208 state_after_t1.try_get_entry(&id_t2).is_none(),
3209 "t2 should NOT exist after first dry run"
3210 );
3211
3212 let (state_incremental, _) = catalog
3214 .transact_incremental_dry_run(
3215 &state_after_t1,
3216 vec![op_t2.clone()],
3217 None,
3218 Some(snapshot_after_t1),
3219 oracle_write_ts,
3220 )
3221 .await
3222 .expect("second dry run");
3223
3224 assert!(
3226 state_incremental.try_get_entry(&id_t1).is_some(),
3227 "t1 should exist in incremental result"
3228 );
3229 assert!(
3230 state_incremental.try_get_entry(&id_t2).is_some(),
3231 "t2 should exist in incremental result"
3232 );
3233
3234 let (state_all_at_once, _) = catalog
3237 .transact_incremental_dry_run(
3238 &base_state,
3239 vec![op_t1.clone(), op_t2.clone()],
3240 None,
3241 None,
3242 oracle_write_ts,
3243 )
3244 .await
3245 .expect("all-at-once dry run");
3246
3247 assert!(
3248 state_all_at_once.try_get_entry(&id_t1).is_some(),
3249 "t1 should exist in all-at-once result"
3250 );
3251 assert!(
3252 state_all_at_once.try_get_entry(&id_t2).is_some(),
3253 "t2 should exist in all-at-once result"
3254 );
3255
3256 let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3259 let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3260 assert_eq!(inc_t1.name(), all_t1.name());
3261 assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3262
3263 let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3264 let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3265 assert_eq!(inc_t2.name(), all_t2.name());
3266 assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3267
3268 catalog.expire().await;
3269 })
3270 .await
3271 }
3272}