1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::optimize::OptimizerFeatures;
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, ColumnName, GlobalId, SqlColumnType, strconv};
47use mz_sql::ast::RawDataType;
48use mz_sql::catalog::{
49 AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
50 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
51 DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
52 RoleVars,
53};
54use mz_sql::names::{
55 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use serde::{Deserialize, Serialize};
66use tracing::{info, trace};
67
68use crate::AdapterError;
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
72 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
73 is_reserved_role_name, object_type_to_audit_object_type,
74 system_object_type_to_audit_object_type,
75};
76use crate::coord::ConnMeta;
77use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
78use crate::coord::cluster_scheduling::SchedulingDecision;
79use crate::util::ResultExt;
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct InjectedAuditEvent {
87 pub event_type: EventType,
88 pub object_type: ObjectType,
89 pub details: EventDetails,
90 pub user: Option<String>,
91}
92
93#[derive(Debug, Clone)]
94pub enum Op {
95 AlterRetainHistory {
96 id: CatalogItemId,
97 value: Option<Value>,
98 window: CompactionWindow,
99 },
100 AlterSourceTimestampInterval {
101 id: CatalogItemId,
102 value: Option<Value>,
103 interval: Duration,
104 },
105 AlterRole {
106 id: RoleId,
107 name: String,
108 attributes: RoleAttributesRaw,
109 nopassword: bool,
110 vars: RoleVars,
111 },
112 AlterNetworkPolicy {
113 id: NetworkPolicyId,
114 rules: Vec<NetworkPolicyRule>,
115 name: String,
116 owner_id: RoleId,
117 },
118 AlterAddColumn {
119 id: CatalogItemId,
120 new_global_id: GlobalId,
121 name: ColumnName,
122 typ: SqlColumnType,
123 sql: RawDataType,
124 },
125 AlterMaterializedViewApplyReplacement {
126 id: CatalogItemId,
127 replacement_id: CatalogItemId,
128 },
129 CreateDatabase {
130 name: String,
131 owner_id: RoleId,
132 },
133 CreateSchema {
134 database_id: ResolvedDatabaseSpecifier,
135 schema_name: String,
136 owner_id: RoleId,
137 },
138 CreateRole {
139 name: String,
140 attributes: RoleAttributesRaw,
141 },
142 CreateCluster {
143 id: ClusterId,
144 name: String,
145 introspection_sources: Vec<&'static BuiltinLog>,
146 owner_id: RoleId,
147 config: ClusterConfig,
148 },
149 CreateClusterReplica {
150 cluster_id: ClusterId,
151 name: String,
152 config: ReplicaConfig,
153 owner_id: RoleId,
154 reason: ReplicaCreateDropReason,
155 },
156 CreateItem {
157 id: CatalogItemId,
158 name: QualifiedItemName,
159 item: CatalogItem,
160 owner_id: RoleId,
161 },
162 CreateNetworkPolicy {
163 rules: Vec<NetworkPolicyRule>,
164 name: String,
165 owner_id: RoleId,
166 },
167 Comment {
168 object_id: CommentObjectId,
169 sub_component: Option<usize>,
170 comment: Option<String>,
171 },
172 DropObjects(Vec<DropObjectInfo>),
173 GrantRole {
174 role_id: RoleId,
175 member_id: RoleId,
176 grantor_id: RoleId,
177 },
178 RenameCluster {
179 id: ClusterId,
180 name: String,
181 to_name: String,
182 check_reserved_names: bool,
183 },
184 RenameClusterReplica {
185 cluster_id: ClusterId,
186 replica_id: ReplicaId,
187 name: QualifiedReplica,
188 to_name: String,
189 },
190 RenameItem {
191 id: CatalogItemId,
192 current_full_name: FullItemName,
193 to_name: String,
194 },
195 RenameSchema {
196 database_spec: ResolvedDatabaseSpecifier,
197 schema_spec: SchemaSpecifier,
198 new_name: String,
199 check_reserved_names: bool,
200 },
201 UpdateOwner {
202 id: ObjectId,
203 new_owner: RoleId,
204 },
205 UpdatePrivilege {
206 target_id: SystemObjectId,
207 privilege: MzAclItem,
208 variant: UpdatePrivilegeVariant,
209 },
210 UpdateDefaultPrivilege {
211 privilege_object: DefaultPrivilegeObject,
212 privilege_acl_item: DefaultPrivilegeAclItem,
213 variant: UpdatePrivilegeVariant,
214 },
215 RevokeRole {
216 role_id: RoleId,
217 member_id: RoleId,
218 grantor_id: RoleId,
219 },
220 UpdateClusterConfig {
221 id: ClusterId,
222 name: String,
223 config: ClusterConfig,
224 },
225 UpdateClusterReplicaConfig {
226 cluster_id: ClusterId,
227 replica_id: ReplicaId,
228 config: ReplicaConfig,
229 },
230 UpdateItem {
231 id: CatalogItemId,
232 name: QualifiedItemName,
233 to_item: CatalogItem,
234 },
235 UpdateSourceReferences {
236 source_id: CatalogItemId,
237 references: SourceReferences,
238 },
239 UpdateSystemConfiguration {
240 name: String,
241 value: OwnedVarInput,
242 },
243 ResetSystemConfiguration {
244 name: String,
245 },
246 ResetAllSystemConfiguration,
247 InjectAuditEvents {
253 events: Vec<InjectedAuditEvent>,
254 },
255}
256
257#[derive(Debug, Clone)]
261pub enum DropObjectInfo {
262 Cluster(ClusterId),
263 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
264 Database(DatabaseId),
265 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
266 Role(RoleId),
267 Item(CatalogItemId),
268 NetworkPolicy(NetworkPolicyId),
269}
270
271impl DropObjectInfo {
272 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
275 match id {
276 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
277 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
278 cluster_id,
279 replica_id,
280 ReplicaCreateDropReason::Manual,
281 )),
282 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
283 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
284 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
285 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
286 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
287 }
288 }
289
290 fn to_object_id(&self) -> ObjectId {
293 match &self {
294 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
295 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
296 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
297 }
298 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
299 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
300 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
301 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
302 DropObjectInfo::NetworkPolicy(network_policy_id) => {
303 ObjectId::NetworkPolicy(network_policy_id.clone())
304 }
305 }
306 }
307}
308
309#[derive(Debug, Clone)]
311pub enum ReplicaCreateDropReason {
312 Manual,
317 ClusterScheduling(Vec<SchedulingDecision>),
320}
321
322impl ReplicaCreateDropReason {
323 pub fn into_audit_log(
324 self,
325 ) -> (
326 CreateOrDropClusterReplicaReasonV1,
327 Option<SchedulingDecisionsWithReasonsV2>,
328 ) {
329 let (reason, scheduling_policies) = match self {
330 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
331 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
332 CreateOrDropClusterReplicaReasonV1::Schedule,
333 Some(scheduling_decisions),
334 ),
335 };
336 (
337 reason,
338 scheduling_policies
339 .as_ref()
340 .map(SchedulingDecision::reasons_to_audit_log_reasons),
341 )
342 }
343}
344
345pub struct TransactionResult {
346 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
347 pub catalog_updates: Vec<ParsedStateUpdate>,
349 pub audit_events: Vec<VersionedEvent>,
350}
351
352#[derive(Debug, Clone, Copy)]
353enum TransactInnerMode {
354 Commit,
358 DryRun,
365}
366
367impl Catalog {
368 fn should_audit_log_item(item: &CatalogItem) -> bool {
369 !item.is_temporary()
370 }
371
372 fn temporary_ids(
375 &self,
376 ops: &[Op],
377 temporary_drops: BTreeSet<(&ConnectionId, String)>,
378 ) -> Result<BTreeSet<CatalogItemId>, Error> {
379 let mut creating = BTreeSet::new();
380 let mut temporary_ids = BTreeSet::new();
381 for op in ops.iter() {
382 if let Op::CreateItem {
383 id,
384 name,
385 item,
386 owner_id: _,
387 } = op
388 {
389 if let Some(conn_id) = item.conn_id() {
390 if self.item_exists_in_temp_schemas(conn_id, &name.item)
391 && !temporary_drops.contains(&(conn_id, name.item.clone()))
392 || creating.contains(&(conn_id, &name.item))
393 {
394 return Err(
395 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
396 );
397 } else {
398 creating.insert((conn_id, &name.item));
399 temporary_ids.insert(id.clone());
400 }
401 }
402 }
403 }
404 Ok(temporary_ids)
405 }
406
407 #[instrument(name = "catalog::transact")]
408 pub async fn transact(
409 &mut self,
410 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
413 oracle_write_ts: mz_repr::Timestamp,
414 session: Option<&ConnMeta>,
415 ops: Vec<Op>,
416 ) -> Result<TransactionResult, AdapterError> {
417 trace!("transact: {:?}", ops);
418 fail::fail_point!("catalog_transact", |arg| {
419 Err(AdapterError::Unstructured(anyhow::anyhow!(
420 "failpoint: {arg:?}"
421 )))
422 });
423
424 let drop_ids: BTreeSet<CatalogItemId> = ops
425 .iter()
426 .filter_map(|op| match op {
427 Op::DropObjects(drop_object_infos) => {
428 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
429 let item_ids = ids.filter_map(|id| match id {
430 ObjectId::Item(id) => Some(id),
431 _ => None,
432 });
433 Some(item_ids)
434 }
435 _ => None,
436 })
437 .flatten()
438 .collect();
439 let temporary_drops = drop_ids
440 .iter()
441 .filter_map(|id| {
442 let entry = self.get_entry(id);
443 match entry.item().conn_id() {
444 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
445 None => None,
446 }
447 })
448 .collect();
449
450 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
451 let mut builtin_table_updates = vec![];
452 let mut catalog_updates = vec![];
453 let mut audit_events = vec![];
454 let mut storage = self.storage().await;
455 let mut tx = storage
456 .transaction()
457 .await
458 .unwrap_or_terminate("starting catalog transaction");
459
460 let new_state = Self::transact_inner(
461 TransactInnerMode::Commit,
462 storage_collections,
463 oracle_write_ts,
464 session,
465 ops,
466 temporary_ids,
467 &mut builtin_table_updates,
468 &mut catalog_updates,
469 &mut audit_events,
470 &mut tx,
471 &self.state,
472 )
473 .await?;
474
475 tx.commit(oracle_write_ts)
480 .await
481 .unwrap_or_terminate("catalog storage transaction commit must succeed");
482
483 drop(storage);
486 if let Some(new_state) = new_state {
487 self.transient_revision += 1;
488 self.state = new_state;
489 }
490
491 Ok(TransactionResult {
492 builtin_table_updates,
493 catalog_updates,
494 audit_events,
495 })
496 }
497
498 pub async fn transact_incremental_dry_run(
513 &self,
514 base_state: &CatalogState,
515 ops: Vec<Op>,
516 session: Option<&ConnMeta>,
517 prev_snapshot: Option<Snapshot>,
518 oracle_write_ts: mz_repr::Timestamp,
519 ) -> Result<(CatalogState, Snapshot), AdapterError> {
520 let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
523
524 let mut builtin_table_updates = vec![];
525 let mut catalog_updates = vec![];
526 let mut audit_events = vec![];
527 let mut storage = self.storage().await;
528 let mut tx = if let Some(snapshot) = prev_snapshot {
529 storage
532 .transaction_from_snapshot(snapshot)
533 .unwrap_or_terminate("starting catalog transaction from snapshot")
534 } else {
535 storage
538 .transaction()
539 .await
540 .unwrap_or_terminate("starting catalog transaction")
541 };
542
543 let new_state = Self::transact_inner(
545 TransactInnerMode::DryRun,
546 None,
547 oracle_write_ts,
548 session,
549 ops,
550 temporary_ids,
551 &mut builtin_table_updates,
552 &mut catalog_updates,
553 &mut audit_events,
554 &mut tx,
555 base_state,
556 )
557 .await?;
558
559 let new_snapshot = tx.current_snapshot();
562
563 drop(storage);
565
566 let state = new_state.unwrap_or_else(|| base_state.clone());
568 Ok((state, new_snapshot))
569 }
570
571 fn extract_expressions_from_ops(
575 ops: &[Op],
576 optimizer_features: &OptimizerFeatures,
577 ) -> BTreeMap<GlobalId, LocalExpressions> {
578 let mut exprs = BTreeMap::new();
579
580 for op in ops {
581 if let Op::CreateItem { item, .. } = op {
582 match item {
583 CatalogItem::View(view) => {
584 exprs.insert(
585 view.global_id,
586 LocalExpressions {
587 local_mir: (*view.locally_optimized_expr).clone(),
588 optimizer_features: optimizer_features.clone(),
589 },
590 );
591 }
592 CatalogItem::MaterializedView(mv) => {
593 exprs.insert(
594 mv.global_id_writes(),
595 LocalExpressions {
596 local_mir: (*mv.locally_optimized_expr).clone(),
597 optimizer_features: optimizer_features.clone(),
598 },
599 );
600 }
601 CatalogItem::Table(_)
602 | CatalogItem::Source(_)
603 | CatalogItem::Log(_)
604 | CatalogItem::Sink(_)
605 | CatalogItem::Index(_)
606 | CatalogItem::Type(_)
607 | CatalogItem::Func(_)
608 | CatalogItem::Secret(_)
609 | CatalogItem::Connection(_) => {}
610 }
611 }
612 }
613
614 exprs
615 }
616
617 #[instrument(name = "catalog::transact_inner")]
625 async fn transact_inner(
626 mode: TransactInnerMode,
627 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
628 oracle_write_ts: mz_repr::Timestamp,
629 session: Option<&ConnMeta>,
630 ops: Vec<Op>,
631 temporary_ids: BTreeSet<CatalogItemId>,
632 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
633 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
634 audit_events: &mut Vec<VersionedEvent>,
635 tx: &mut Transaction<'_>,
636 state: &CatalogState,
637 ) -> Result<Option<CatalogState>, AdapterError> {
638 let mut preliminary_state = Cow::Borrowed(state);
665
666 let mut state = Cow::Borrowed(state);
668
669 if ops.is_empty() {
670 return Ok(None);
671 }
672
673 let optimizer_features = OptimizerFeatures::from(state.system_config());
676 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
677
678 let mut storage_collections_to_create = BTreeSet::new();
679 let mut storage_collections_to_drop = BTreeSet::new();
680 let mut storage_collections_to_register = BTreeMap::new();
681
682 let mut updates = Vec::new();
683
684 for op in ops {
685 let temporary_item_updates = Self::transact_op(
686 oracle_write_ts,
687 session,
688 op,
689 &temporary_ids,
690 audit_events,
691 tx,
692 &*preliminary_state,
693 &mut storage_collections_to_create,
694 &mut storage_collections_to_drop,
695 &mut storage_collections_to_register,
696 )
697 .await?;
698
699 let upper = tx.upper();
704 let temporary_item_updates =
705 temporary_item_updates
706 .into_iter()
707 .map(|(item, diff)| StateUpdate {
708 kind: StateUpdateKind::TemporaryItem(item),
709 ts: upper,
710 diff,
711 });
712
713 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
714 op_updates.extend(temporary_item_updates);
715 if !op_updates.is_empty() {
716 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
719 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
720 .to_mut()
721 .apply_updates(op_updates.clone(), &mut local_expr_cache)
722 .await;
723 }
724 updates.append(&mut op_updates);
725 }
726
727 if !updates.is_empty() {
728 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
729 let (op_builtin_table_updates, op_catalog_updates) = state
730 .to_mut()
731 .apply_updates(updates.clone(), &mut local_expr_cache)
732 .await;
733 let op_builtin_table_updates = state
734 .to_mut()
735 .resolve_builtin_table_updates(op_builtin_table_updates);
736 builtin_table_updates.extend(op_builtin_table_updates);
737 parsed_catalog_updates.extend(op_catalog_updates);
738 }
739
740 match mode {
741 TransactInnerMode::Commit => {
742 if let Some(c) = storage_collections {
744 c.prepare_state(
745 tx,
746 storage_collections_to_create,
747 storage_collections_to_drop,
748 storage_collections_to_register,
749 )
750 .await?;
751 }
752 }
753 TransactInnerMode::DryRun => {
754 debug_assert!(
755 storage_collections.is_none(),
756 "dry-run mode must not prepare storage state"
757 );
758 }
759 }
760
761 let updates = tx.get_and_commit_op_updates();
762 if !updates.is_empty() {
763 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
764 let (op_builtin_table_updates, op_catalog_updates) = state
765 .to_mut()
766 .apply_updates(updates.clone(), &mut local_expr_cache)
767 .await;
768 let op_builtin_table_updates = state
769 .to_mut()
770 .resolve_builtin_table_updates(op_builtin_table_updates);
771 builtin_table_updates.extend(op_builtin_table_updates);
772 parsed_catalog_updates.extend(op_catalog_updates);
773 }
774
775 match state {
776 Cow::Owned(state) => Ok(Some(state)),
777 Cow::Borrowed(_) => Ok(None),
778 }
779 }
780
781 #[instrument]
789 async fn transact_op(
790 oracle_write_ts: mz_repr::Timestamp,
791 session: Option<&ConnMeta>,
792 op: Op,
793 temporary_ids: &BTreeSet<CatalogItemId>,
794 audit_events: &mut Vec<VersionedEvent>,
795 tx: &mut Transaction<'_>,
796 state: &CatalogState,
797 storage_collections_to_create: &mut BTreeSet<GlobalId>,
798 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
799 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
800 ) -> Result<Vec<(TemporaryItem, StateDiff)>, AdapterError> {
801 let mut temporary_item_updates = Vec::new();
802
803 match op {
804 Op::AlterRetainHistory { id, value, window } => {
805 let entry = state.get_entry(&id);
806 if id.is_system() {
807 let name = entry.name();
808 let full_name =
809 state.resolve_full_name(name, session.map(|session| session.conn_id()));
810 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
811 full_name.to_string(),
812 ))));
813 }
814
815 let mut new_entry = entry.clone();
816 let previous = new_entry
817 .item
818 .update_retain_history(value.clone(), window)
819 .map_err(|_| {
820 AdapterError::Catalog(Error::new(ErrorKind::Internal(
821 "planner should have rejected invalid alter retain history item type"
822 .to_string(),
823 )))
824 })?;
825
826 if Self::should_audit_log_item(new_entry.item()) {
827 let details =
828 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
829 id: id.to_string(),
830 old_history: previous.map(|previous| previous.to_string()),
831 new_history: value.map(|v| v.to_string()),
832 });
833 CatalogState::add_to_audit_log(
834 &state.system_configuration,
835 oracle_write_ts,
836 session,
837 tx,
838 audit_events,
839 EventType::Alter,
840 catalog_type_to_audit_object_type(new_entry.item().typ()),
841 details,
842 )?;
843 }
844
845 tx.update_item(id, new_entry.into())?;
846
847 Self::log_update(state, &id);
848 }
849 Op::AlterSourceTimestampInterval {
850 id,
851 value,
852 interval,
853 } => {
854 let entry = state.get_entry(&id);
855 if id.is_system() {
856 let name = entry.name();
857 let full_name =
858 state.resolve_full_name(name, session.map(|session| session.conn_id()));
859 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
860 full_name.to_string(),
861 ))));
862 }
863
864 let mut new_entry = entry.clone();
865 let previous = new_entry
866 .item
867 .update_timestamp_interval(value.clone(), interval)
868 .map_err(|_| {
869 AdapterError::Catalog(Error::new(ErrorKind::Internal(
870 "planner should have rejected invalid alter timestamp interval item type"
871 .to_string(),
872 )))
873 })?;
874
875 if Self::should_audit_log_item(new_entry.item()) {
876 let details = EventDetails::AlterSourceTimestampIntervalV1(
877 mz_audit_log::AlterSourceTimestampIntervalV1 {
878 id: id.to_string(),
879 old_interval: previous.map(|previous| previous.to_string()),
880 new_interval: value.map(|v| v.to_string()),
881 },
882 );
883 CatalogState::add_to_audit_log(
884 &state.system_configuration,
885 oracle_write_ts,
886 session,
887 tx,
888 audit_events,
889 EventType::Alter,
890 catalog_type_to_audit_object_type(new_entry.item().typ()),
891 details,
892 )?;
893 }
894
895 tx.update_item(id, new_entry.into())?;
896
897 Self::log_update(state, &id);
898 }
899 Op::AlterRole {
900 id,
901 name,
902 attributes,
903 nopassword,
904 vars,
905 } => {
906 state.ensure_not_reserved_role(&id)?;
907
908 let mut existing_role = state.get_role(&id).clone();
909 let password = attributes.password.clone();
910 let scram_iterations = attributes
911 .scram_iterations
912 .unwrap_or_else(|| state.system_config().scram_iterations());
913 existing_role.attributes = attributes.into();
914 existing_role.vars = vars;
915 let password_action = if nopassword {
916 PasswordAction::Clear
917 } else if let Some(password) = password {
918 PasswordAction::Set(PasswordConfig {
919 password,
920 scram_iterations,
921 })
922 } else {
923 PasswordAction::NoChange
924 };
925 tx.update_role(id, existing_role.into(), password_action)?;
926
927 CatalogState::add_to_audit_log(
928 &state.system_configuration,
929 oracle_write_ts,
930 session,
931 tx,
932 audit_events,
933 EventType::Alter,
934 ObjectType::Role,
935 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
936 id: id.to_string(),
937 name: name.clone(),
938 }),
939 )?;
940
941 info!("update role {name} ({id})");
942 }
943 Op::AlterNetworkPolicy {
944 id,
945 rules,
946 name,
947 owner_id: _owner_id,
948 } => {
949 let existing_policy = state.get_network_policy(&id).clone();
950 let mut policy: NetworkPolicy = existing_policy.into();
951 policy.rules = rules;
952 if is_reserved_name(&name) {
953 return Err(AdapterError::Catalog(Error::new(
954 ErrorKind::ReservedNetworkPolicyName(name),
955 )));
956 }
957 tx.update_network_policy(id, policy.clone())?;
958
959 CatalogState::add_to_audit_log(
960 &state.system_configuration,
961 oracle_write_ts,
962 session,
963 tx,
964 audit_events,
965 EventType::Alter,
966 ObjectType::NetworkPolicy,
967 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
968 id: id.to_string(),
969 name: name.clone(),
970 }),
971 )?;
972
973 info!("update network policy {name} ({id})");
974 }
975 Op::AlterAddColumn {
976 id,
977 new_global_id,
978 name,
979 typ,
980 sql,
981 } => {
982 let column_name = name.to_string();
983 let column_type = typ.to_string();
984 let nullable = typ.nullable;
985 let mut new_entry = state.get_entry(&id).clone();
986 let version = new_entry.item.add_column(name, typ, sql)?;
987 let shard_id = state
990 .storage_metadata()
991 .get_collection_shard(new_entry.latest_global_id())?;
992
993 let CatalogItem::Table(table) = &mut new_entry.item else {
995 return Err(AdapterError::Unsupported("adding columns to non-Table"));
996 };
997 table.collections.insert(version, new_global_id);
998
999 if Self::should_audit_log_item(new_entry.item()) {
1000 let details = EventDetails::AlterAddColumnV1(mz_audit_log::AlterAddColumnV1 {
1001 id: id.to_string(),
1002 column: column_name,
1003 column_type,
1004 nullable,
1005 });
1006 CatalogState::add_to_audit_log(
1007 &state.system_configuration,
1008 oracle_write_ts,
1009 session,
1010 tx,
1011 audit_events,
1012 EventType::Alter,
1013 catalog_type_to_audit_object_type(new_entry.item().typ()),
1014 details,
1015 )?;
1016 }
1017
1018 tx.update_item(id, new_entry.into())?;
1019 storage_collections_to_register.insert(new_global_id, shard_id);
1020 }
1021 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1022 let mut new_entry = state.get_entry(&id).clone();
1023 let replacement = state.get_entry(&replacement_id);
1024
1025 let new_audit_events =
1026 apply_replacement_audit_events(state, &new_entry, replacement);
1027
1028 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1029 return Err(AdapterError::internal(
1030 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1031 "id must refer to a materialized view",
1032 ));
1033 };
1034 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1035 return Err(AdapterError::internal(
1036 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1037 "replacement_id must refer to a materialized view",
1038 ));
1039 };
1040
1041 mv.apply_replacement(replacement_mv.clone());
1042
1043 tx.remove_item(replacement_id)?;
1044
1045 new_entry.id = replacement_id;
1046 tx_replace_item(tx, state, id, new_entry)?;
1047
1048 let comment_id = CommentObjectId::MaterializedView(replacement_id);
1049 tx.drop_comments(&[comment_id].into())?;
1050
1051 for (event_type, details) in new_audit_events {
1052 CatalogState::add_to_audit_log(
1053 &state.system_configuration,
1054 oracle_write_ts,
1055 session,
1056 tx,
1057 audit_events,
1058 event_type,
1059 ObjectType::MaterializedView,
1060 details,
1061 )?;
1062 }
1063 }
1064 Op::CreateDatabase { name, owner_id } => {
1065 let database_owner_privileges = vec![rbac::owner_privilege(
1066 mz_sql::catalog::ObjectType::Database,
1067 owner_id,
1068 )];
1069 let database_default_privileges = state
1070 .default_privileges
1071 .get_applicable_privileges(
1072 owner_id,
1073 None,
1074 None,
1075 mz_sql::catalog::ObjectType::Database,
1076 )
1077 .map(|item| item.mz_acl_item(owner_id));
1078 let database_privileges: Vec<_> = merge_mz_acl_items(
1079 database_owner_privileges
1080 .into_iter()
1081 .chain(database_default_privileges),
1082 )
1083 .collect();
1084
1085 let schema_owner_privileges = vec![rbac::owner_privilege(
1086 mz_sql::catalog::ObjectType::Schema,
1087 owner_id,
1088 )];
1089 let schema_default_privileges = state
1090 .default_privileges
1091 .get_applicable_privileges(
1092 owner_id,
1093 None,
1094 None,
1095 mz_sql::catalog::ObjectType::Schema,
1096 )
1097 .map(|item| item.mz_acl_item(owner_id))
1098 .chain(std::iter::once(MzAclItem {
1100 grantee: RoleId::Public,
1101 grantor: owner_id,
1102 acl_mode: AclMode::USAGE,
1103 }));
1104 let schema_privileges: Vec<_> = merge_mz_acl_items(
1105 schema_owner_privileges
1106 .into_iter()
1107 .chain(schema_default_privileges),
1108 )
1109 .collect();
1110
1111 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1112 let (database_id, _) = tx.insert_user_database(
1113 &name,
1114 owner_id,
1115 database_privileges.clone(),
1116 &temporary_oids,
1117 )?;
1118 let (schema_id, _) = tx.insert_user_schema(
1119 database_id,
1120 DEFAULT_SCHEMA,
1121 owner_id,
1122 schema_privileges.clone(),
1123 &temporary_oids,
1124 )?;
1125 CatalogState::add_to_audit_log(
1126 &state.system_configuration,
1127 oracle_write_ts,
1128 session,
1129 tx,
1130 audit_events,
1131 EventType::Create,
1132 ObjectType::Database,
1133 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1134 id: database_id.to_string(),
1135 name: name.clone(),
1136 }),
1137 )?;
1138 info!("create database {}", name);
1139
1140 CatalogState::add_to_audit_log(
1141 &state.system_configuration,
1142 oracle_write_ts,
1143 session,
1144 tx,
1145 audit_events,
1146 EventType::Create,
1147 ObjectType::Schema,
1148 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1149 id: schema_id.to_string(),
1150 name: DEFAULT_SCHEMA.to_string(),
1151 database_name: Some(name),
1152 }),
1153 )?;
1154 }
1155 Op::CreateSchema {
1156 database_id,
1157 schema_name,
1158 owner_id,
1159 } => {
1160 if is_reserved_name(&schema_name) {
1161 return Err(AdapterError::Catalog(Error::new(
1162 ErrorKind::ReservedSchemaName(schema_name),
1163 )));
1164 }
1165 let database_id = match database_id {
1166 ResolvedDatabaseSpecifier::Id(id) => id,
1167 ResolvedDatabaseSpecifier::Ambient => {
1168 return Err(AdapterError::Catalog(Error::new(
1169 ErrorKind::ReadOnlySystemSchema(schema_name),
1170 )));
1171 }
1172 };
1173 let owner_privileges = vec![rbac::owner_privilege(
1174 mz_sql::catalog::ObjectType::Schema,
1175 owner_id,
1176 )];
1177 let default_privileges = state
1178 .default_privileges
1179 .get_applicable_privileges(
1180 owner_id,
1181 Some(database_id),
1182 None,
1183 mz_sql::catalog::ObjectType::Schema,
1184 )
1185 .map(|item| item.mz_acl_item(owner_id));
1186 let privileges: Vec<_> =
1187 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1188 .collect();
1189 let (schema_id, _) = tx.insert_user_schema(
1190 database_id,
1191 &schema_name,
1192 owner_id,
1193 privileges.clone(),
1194 &state.get_temporary_oids().collect(),
1195 )?;
1196 CatalogState::add_to_audit_log(
1197 &state.system_configuration,
1198 oracle_write_ts,
1199 session,
1200 tx,
1201 audit_events,
1202 EventType::Create,
1203 ObjectType::Schema,
1204 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1205 id: schema_id.to_string(),
1206 name: schema_name.clone(),
1207 database_name: Some(state.database_by_id[&database_id].name.clone()),
1208 }),
1209 )?;
1210 }
1211 Op::CreateRole { name, attributes } => {
1212 if is_reserved_role_name(&name) {
1213 return Err(AdapterError::Catalog(Error::new(
1214 ErrorKind::ReservedRoleName(name),
1215 )));
1216 }
1217 let membership = RoleMembership::new();
1218 let vars = RoleVars::default();
1219 let (id, _) = tx.insert_user_role(
1220 name.clone(),
1221 attributes.clone(),
1222 membership.clone(),
1223 vars.clone(),
1224 &state.get_temporary_oids().collect(),
1225 )?;
1226 CatalogState::add_to_audit_log(
1227 &state.system_configuration,
1228 oracle_write_ts,
1229 session,
1230 tx,
1231 audit_events,
1232 EventType::Create,
1233 ObjectType::Role,
1234 EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1235 id: id.to_string(),
1236 name: name.clone(),
1237 auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1238 AutoProvisionSource::Oidc => "oidc".to_string(),
1239 AutoProvisionSource::Frontegg => "frontegg".to_string(),
1240 AutoProvisionSource::None => "none".to_string(),
1241 }),
1242 }),
1243 )?;
1244 info!("create role {}", name);
1245 }
1246 Op::CreateCluster {
1247 id,
1248 name,
1249 introspection_sources,
1250 owner_id,
1251 config,
1252 } => {
1253 if is_reserved_name(&name) {
1254 return Err(AdapterError::Catalog(Error::new(
1255 ErrorKind::ReservedClusterName(name),
1256 )));
1257 }
1258 let owner_privileges = vec![rbac::owner_privilege(
1259 mz_sql::catalog::ObjectType::Cluster,
1260 owner_id,
1261 )];
1262 let default_privileges = state
1263 .default_privileges
1264 .get_applicable_privileges(
1265 owner_id,
1266 None,
1267 None,
1268 mz_sql::catalog::ObjectType::Cluster,
1269 )
1270 .map(|item| item.mz_acl_item(owner_id));
1271 let privileges: Vec<_> =
1272 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1273 .collect();
1274 let introspection_source_ids: Vec<_> = introspection_sources
1275 .iter()
1276 .map(|introspection_source| {
1277 Transaction::allocate_introspection_source_index_id(
1278 &id,
1279 introspection_source.variant,
1280 )
1281 })
1282 .collect();
1283
1284 let introspection_sources = introspection_sources
1285 .into_iter()
1286 .zip_eq(introspection_source_ids)
1287 .map(|(log, (item_id, gid))| (log, item_id, gid))
1288 .collect();
1289
1290 tx.insert_user_cluster(
1291 id,
1292 &name,
1293 introspection_sources,
1294 owner_id,
1295 privileges.clone(),
1296 config.clone().into(),
1297 &state.get_temporary_oids().collect(),
1298 )?;
1299 CatalogState::add_to_audit_log(
1300 &state.system_configuration,
1301 oracle_write_ts,
1302 session,
1303 tx,
1304 audit_events,
1305 EventType::Create,
1306 ObjectType::Cluster,
1307 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1308 id: id.to_string(),
1309 name: name.clone(),
1310 }),
1311 )?;
1312 info!("create cluster {}", name);
1313 }
1314 Op::CreateClusterReplica {
1315 cluster_id,
1316 name,
1317 config,
1318 owner_id,
1319 reason,
1320 } => {
1321 if is_reserved_name(&name) {
1322 return Err(AdapterError::Catalog(Error::new(
1323 ErrorKind::ReservedReplicaName(name),
1324 )));
1325 }
1326 let cluster = state.get_cluster(cluster_id);
1327 let id =
1328 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1329 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1330 size,
1331 billed_as,
1332 internal,
1333 ..
1334 }) = &config.location
1335 {
1336 let (reason, scheduling_policies) = reason.into_audit_log();
1337 let details = EventDetails::CreateClusterReplicaV4(
1338 mz_audit_log::CreateClusterReplicaV4 {
1339 cluster_id: cluster_id.to_string(),
1340 cluster_name: cluster.name.clone(),
1341 replica_id: Some(id.to_string()),
1342 replica_name: name.clone(),
1343 logical_size: size.clone(),
1344 billed_as: billed_as.clone(),
1345 internal: *internal,
1346 reason,
1347 scheduling_policies,
1348 },
1349 );
1350 CatalogState::add_to_audit_log(
1351 &state.system_configuration,
1352 oracle_write_ts,
1353 session,
1354 tx,
1355 audit_events,
1356 EventType::Create,
1357 ObjectType::ClusterReplica,
1358 details,
1359 )?;
1360 }
1361 }
1362 Op::CreateItem {
1363 id,
1364 name,
1365 item,
1366 owner_id,
1367 } => {
1368 state.check_unstable_dependencies(&item)?;
1369
1370 match &item {
1371 CatalogItem::Table(table) => {
1372 let gids: Vec<_> = table.global_ids().collect();
1373 assert_eq!(gids.len(), 1);
1374 storage_collections_to_create.extend(gids);
1375 }
1376 CatalogItem::Source(source) => {
1377 storage_collections_to_create.insert(source.global_id());
1378 }
1379 CatalogItem::MaterializedView(mv) => {
1380 let mv_gid = mv.global_id_writes();
1381 if let Some(target_id) = mv.replacement_target {
1382 let target_gid = state.get_entry(&target_id).latest_global_id();
1383 let shard_id =
1384 state.storage_metadata().get_collection_shard(target_gid)?;
1385 storage_collections_to_register.insert(mv_gid, shard_id);
1386 } else {
1387 storage_collections_to_create.insert(mv_gid);
1388 }
1389 }
1390 CatalogItem::Sink(sink) => {
1391 storage_collections_to_create.insert(sink.global_id());
1392 }
1393 CatalogItem::Log(_)
1394 | CatalogItem::View(_)
1395 | CatalogItem::Index(_)
1396 | CatalogItem::Type(_)
1397 | CatalogItem::Func(_)
1398 | CatalogItem::Secret(_)
1399 | CatalogItem::Connection(_) => (),
1400 }
1401
1402 let system_user = session.map_or(false, |s| s.user().is_system_user());
1403 if !system_user {
1404 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1405 let cluster_name = state.clusters_by_id[&id].name.clone();
1406 return Err(AdapterError::Catalog(Error::new(
1407 ErrorKind::ReadOnlyCluster(cluster_name),
1408 )));
1409 }
1410 }
1411
1412 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1413 let default_privileges = state
1414 .default_privileges
1415 .get_applicable_privileges(
1416 owner_id,
1417 name.qualifiers.database_spec.id(),
1418 Some(name.qualifiers.schema_spec.into()),
1419 item.typ().into(),
1420 )
1421 .map(|item| item.mz_acl_item(owner_id));
1422 let progress_source_privilege = if item.is_progress_source() {
1424 Some(MzAclItem {
1425 grantee: MZ_SUPPORT_ROLE_ID,
1426 grantor: owner_id,
1427 acl_mode: AclMode::SELECT,
1428 })
1429 } else {
1430 None
1431 };
1432 let privileges: Vec<_> = merge_mz_acl_items(
1433 owner_privileges
1434 .into_iter()
1435 .chain(default_privileges)
1436 .chain(progress_source_privilege),
1437 )
1438 .collect();
1439
1440 let temporary_oids = state.get_temporary_oids().collect();
1441
1442 if item.is_temporary() {
1443 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1444 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1445 {
1446 return Err(AdapterError::Catalog(Error::new(
1447 ErrorKind::InvalidTemporarySchema,
1448 )));
1449 }
1450 let oid = tx.allocate_oid(&temporary_oids)?;
1451
1452 let schema_id = name.qualifiers.schema_spec.clone().into();
1453 let item_type = item.typ();
1454 let (create_sql, global_id, versions) = item.to_serialized();
1455
1456 let item = TemporaryItem {
1457 id,
1458 oid,
1459 global_id,
1460 schema_id,
1461 name: name.item.clone(),
1462 create_sql,
1463 conn_id: item.conn_id().cloned(),
1464 owner_id,
1465 privileges: privileges.clone(),
1466 extra_versions: versions,
1467 };
1468 temporary_item_updates.push((item, StateDiff::Addition));
1469
1470 info!(
1471 "create temporary {} {} ({})",
1472 item_type,
1473 state.resolve_full_name(&name, None),
1474 id
1475 );
1476 } else {
1477 if let Some(temp_id) =
1478 item.uses()
1479 .iter()
1480 .find(|id| match state.try_get_entry(*id) {
1481 Some(entry) => entry.item().is_temporary(),
1482 None => temporary_ids.contains(id),
1483 })
1484 {
1485 let temp_item = state.get_entry(temp_id);
1486 return Err(AdapterError::Catalog(Error::new(
1487 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1488 )));
1489 }
1490 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1491 && !system_user
1492 {
1493 let schema_name = state
1494 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1495 .schema;
1496 return Err(AdapterError::Catalog(Error::new(
1497 ErrorKind::ReadOnlySystemSchema(schema_name),
1498 )));
1499 }
1500 let schema_id = name.qualifiers.schema_spec.clone().into();
1501 let item_type = item.typ();
1502 let (create_sql, global_id, versions) = item.to_serialized();
1503 tx.insert_user_item(
1504 id,
1505 global_id,
1506 schema_id,
1507 &name.item,
1508 create_sql,
1509 owner_id,
1510 privileges.clone(),
1511 &temporary_oids,
1512 versions,
1513 )?;
1514 info!(
1515 "create {} {} ({})",
1516 item_type,
1517 state.resolve_full_name(&name, None),
1518 id
1519 );
1520 }
1521
1522 if Self::should_audit_log_item(&item) {
1523 let name = Self::full_name_detail(
1524 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1525 );
1526 let details = match &item {
1527 CatalogItem::Source(s) => {
1528 let cluster_id = match s.data_source {
1529 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1532 match state.get_entry(&ingestion_id).cluster_id() {
1533 Some(cluster_id) => Some(cluster_id.to_string()),
1534 None => None,
1535 }
1536 }
1537 _ => match item.cluster_id() {
1538 Some(cluster_id) => Some(cluster_id.to_string()),
1539 None => None,
1540 },
1541 };
1542
1543 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1544 id: id.to_string(),
1545 cluster_id,
1546 name,
1547 external_type: s.source_type().to_string(),
1548 })
1549 }
1550 CatalogItem::Sink(s) => {
1551 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1552 id: id.to_string(),
1553 cluster_id: Some(s.cluster_id.to_string()),
1554 name,
1555 external_type: s.sink_type().to_string(),
1556 })
1557 }
1558 CatalogItem::Index(i) => {
1559 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1560 id: id.to_string(),
1561 name,
1562 cluster_id: i.cluster_id.to_string(),
1563 })
1564 }
1565 CatalogItem::MaterializedView(mv) => {
1566 EventDetails::CreateMaterializedViewV1(
1567 mz_audit_log::CreateMaterializedViewV1 {
1568 id: id.to_string(),
1569 name,
1570 cluster_id: mv.cluster_id.to_string(),
1571 replacement_target_id: mv
1572 .replacement_target
1573 .map(|id| id.to_string()),
1574 },
1575 )
1576 }
1577 CatalogItem::Table(_)
1578 | CatalogItem::Log(_)
1579 | CatalogItem::View(_)
1580 | CatalogItem::Type(_)
1581 | CatalogItem::Func(_)
1582 | CatalogItem::Secret(_)
1583 | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1584 id: id.to_string(),
1585 name,
1586 }),
1587 };
1588 CatalogState::add_to_audit_log(
1589 &state.system_configuration,
1590 oracle_write_ts,
1591 session,
1592 tx,
1593 audit_events,
1594 EventType::Create,
1595 catalog_type_to_audit_object_type(item.typ()),
1596 details,
1597 )?;
1598 }
1599 }
1600 Op::CreateNetworkPolicy {
1601 rules,
1602 name,
1603 owner_id,
1604 } => {
1605 if state.network_policies_by_name.contains_key(&name) {
1606 return Err(AdapterError::PlanError(PlanError::Catalog(
1607 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1608 )));
1609 }
1610 if is_reserved_name(&name) {
1611 return Err(AdapterError::Catalog(Error::new(
1612 ErrorKind::ReservedNetworkPolicyName(name),
1613 )));
1614 }
1615
1616 let owner_privileges = vec![rbac::owner_privilege(
1617 mz_sql::catalog::ObjectType::NetworkPolicy,
1618 owner_id,
1619 )];
1620 let default_privileges = state
1621 .default_privileges
1622 .get_applicable_privileges(
1623 owner_id,
1624 None,
1625 None,
1626 mz_sql::catalog::ObjectType::NetworkPolicy,
1627 )
1628 .map(|item| item.mz_acl_item(owner_id));
1629 let privileges: Vec<_> =
1630 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1631 .collect();
1632
1633 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1634 let id = tx.insert_user_network_policy(
1635 name.clone(),
1636 rules,
1637 privileges,
1638 owner_id,
1639 &temporary_oids,
1640 )?;
1641
1642 CatalogState::add_to_audit_log(
1643 &state.system_configuration,
1644 oracle_write_ts,
1645 session,
1646 tx,
1647 audit_events,
1648 EventType::Create,
1649 ObjectType::NetworkPolicy,
1650 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1651 id: id.to_string(),
1652 name: name.clone(),
1653 }),
1654 )?;
1655
1656 info!("created network policy {name} ({id})");
1657 }
1658 Op::Comment {
1659 object_id,
1660 sub_component,
1661 comment,
1662 } => {
1663 tx.update_comment(object_id, sub_component, comment)?;
1664 let entry = state.get_comment_id_entry(&object_id);
1665 let should_log = entry
1666 .map(|entry| Self::should_audit_log_item(entry.item()))
1667 .unwrap_or(true);
1669 if let (Some(conn_id), true) =
1672 (session.map(|session| session.conn_id()), should_log)
1673 {
1674 CatalogState::add_to_audit_log(
1675 &state.system_configuration,
1676 oracle_write_ts,
1677 session,
1678 tx,
1679 audit_events,
1680 EventType::Comment,
1681 comment_id_to_audit_object_type(object_id),
1682 EventDetails::IdNameV1(IdNameV1 {
1683 id: format!("{object_id:?}"),
1685 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1686 }),
1687 )?;
1688 }
1689 }
1690 Op::UpdateSourceReferences {
1691 source_id,
1692 references,
1693 } => {
1694 tx.update_source_references(
1695 source_id,
1696 references
1697 .references
1698 .into_iter()
1699 .map(|reference| reference.into())
1700 .collect(),
1701 references.updated_at,
1702 )?;
1703 }
1704 Op::DropObjects(drop_object_infos) => {
1705 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1707
1708 tx.drop_comments(&delta.comments)?;
1710
1711 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1713 delta
1714 .items
1715 .iter()
1716 .map(|id| id)
1717 .partition(|id| !state.get_entry(*id).item().is_temporary());
1718 tx.remove_items(&durable_items_to_drop)?;
1719 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1720 let entry = state.get_entry(&id);
1721 (entry.clone().into(), StateDiff::Retraction)
1722 }));
1723
1724 for item_id in delta.items {
1725 let entry = state.get_entry(&item_id);
1726
1727 if entry.item().is_storage_collection() {
1728 storage_collections_to_drop.extend(entry.global_ids());
1729 }
1730
1731 if state.source_references.contains_key(&item_id) {
1732 tx.remove_source_references(item_id)?;
1733 }
1734
1735 if Self::should_audit_log_item(entry.item()) {
1736 CatalogState::add_to_audit_log(
1737 &state.system_configuration,
1738 oracle_write_ts,
1739 session,
1740 tx,
1741 audit_events,
1742 EventType::Drop,
1743 catalog_type_to_audit_object_type(entry.item().typ()),
1744 EventDetails::IdFullNameV1(IdFullNameV1 {
1745 id: item_id.to_string(),
1746 name: Self::full_name_detail(&state.resolve_full_name(
1747 entry.name(),
1748 session.map(|session| session.conn_id()),
1749 )),
1750 }),
1751 )?;
1752 }
1753 info!(
1754 "drop {} {} ({})",
1755 entry.item_type(),
1756 state.resolve_full_name(entry.name(), entry.conn_id()),
1757 item_id
1758 );
1759 }
1760
1761 let schemas = delta
1763 .schemas
1764 .iter()
1765 .map(|(schema_spec, database_spec)| {
1766 (SchemaId::from(schema_spec), *database_spec)
1767 })
1768 .collect();
1769 tx.remove_schemas(&schemas)?;
1770
1771 for (schema_spec, database_spec) in delta.schemas {
1772 let schema = state.get_schema(
1773 &database_spec,
1774 &schema_spec,
1775 session
1776 .map(|session| session.conn_id())
1777 .unwrap_or(&SYSTEM_CONN_ID),
1778 );
1779
1780 let schema_id = SchemaId::from(schema_spec);
1781 let database_id = match database_spec {
1782 ResolvedDatabaseSpecifier::Ambient => None,
1783 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1784 };
1785
1786 CatalogState::add_to_audit_log(
1787 &state.system_configuration,
1788 oracle_write_ts,
1789 session,
1790 tx,
1791 audit_events,
1792 EventType::Drop,
1793 ObjectType::Schema,
1794 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1795 id: schema_id.to_string(),
1796 name: schema.name.schema.to_string(),
1797 database_name: database_id
1798 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1799 }),
1800 )?;
1801 }
1802
1803 tx.remove_databases(&delta.databases)?;
1805
1806 for database_id in delta.databases {
1807 let database = state.get_database(&database_id).clone();
1808
1809 CatalogState::add_to_audit_log(
1810 &state.system_configuration,
1811 oracle_write_ts,
1812 session,
1813 tx,
1814 audit_events,
1815 EventType::Drop,
1816 ObjectType::Database,
1817 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1818 id: database_id.to_string(),
1819 name: database.name.clone(),
1820 }),
1821 )?;
1822 }
1823
1824 tx.remove_user_roles(&delta.roles)?;
1826
1827 for role_id in delta.roles {
1828 let role = state
1829 .roles_by_id
1830 .get(&role_id)
1831 .expect("catalog out of sync");
1832
1833 CatalogState::add_to_audit_log(
1834 &state.system_configuration,
1835 oracle_write_ts,
1836 session,
1837 tx,
1838 audit_events,
1839 EventType::Drop,
1840 ObjectType::Role,
1841 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1842 id: role.id.to_string(),
1843 name: role.name.clone(),
1844 }),
1845 )?;
1846 info!("drop role {}", role.name());
1847 }
1848
1849 tx.remove_network_policies(&delta.network_policies)?;
1851
1852 for network_policy_id in delta.network_policies {
1853 let policy = state
1854 .network_policies_by_id
1855 .get(&network_policy_id)
1856 .expect("catalog out of sync");
1857
1858 CatalogState::add_to_audit_log(
1859 &state.system_configuration,
1860 oracle_write_ts,
1861 session,
1862 tx,
1863 audit_events,
1864 EventType::Drop,
1865 ObjectType::NetworkPolicy,
1866 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1867 id: policy.id.to_string(),
1868 name: policy.name.clone(),
1869 }),
1870 )?;
1871 info!("drop network policy {}", policy.name.clone());
1872 }
1873
1874 let replicas = delta.replicas.keys().copied().collect();
1876 tx.remove_cluster_replicas(&replicas)?;
1877
1878 for (replica_id, (cluster_id, reason)) in delta.replicas {
1879 let cluster = state.get_cluster(cluster_id);
1880 let replica = cluster.replica(replica_id).expect("Must exist");
1881
1882 let (reason, scheduling_policies) = reason.into_audit_log();
1883 let details =
1884 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1885 cluster_id: cluster_id.to_string(),
1886 cluster_name: cluster.name.clone(),
1887 replica_id: Some(replica_id.to_string()),
1888 replica_name: replica.name.clone(),
1889 reason,
1890 scheduling_policies,
1891 });
1892 CatalogState::add_to_audit_log(
1893 &state.system_configuration,
1894 oracle_write_ts,
1895 session,
1896 tx,
1897 audit_events,
1898 EventType::Drop,
1899 ObjectType::ClusterReplica,
1900 details,
1901 )?;
1902 }
1903
1904 tx.remove_clusters(&delta.clusters)?;
1906
1907 for cluster_id in delta.clusters {
1908 let cluster = state.get_cluster(cluster_id);
1909
1910 CatalogState::add_to_audit_log(
1911 &state.system_configuration,
1912 oracle_write_ts,
1913 session,
1914 tx,
1915 audit_events,
1916 EventType::Drop,
1917 ObjectType::Cluster,
1918 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1919 id: cluster.id.to_string(),
1920 name: cluster.name.clone(),
1921 }),
1922 )?;
1923 }
1924 }
1925 Op::GrantRole {
1926 role_id,
1927 member_id,
1928 grantor_id,
1929 } => {
1930 state.ensure_not_reserved_role(&member_id)?;
1931 state.ensure_grantable_role(&role_id)?;
1932 if state.collect_role_membership(&role_id).contains(&member_id) {
1933 let group_role = state.get_role(&role_id);
1934 let member_role = state.get_role(&member_id);
1935 return Err(AdapterError::Catalog(Error::new(
1936 ErrorKind::CircularRoleMembership {
1937 role_name: group_role.name().to_string(),
1938 member_name: member_role.name().to_string(),
1939 },
1940 )));
1941 }
1942 let mut member_role = state.get_role(&member_id).clone();
1943 member_role.membership.map.insert(role_id, grantor_id);
1944 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1945
1946 CatalogState::add_to_audit_log(
1947 &state.system_configuration,
1948 oracle_write_ts,
1949 session,
1950 tx,
1951 audit_events,
1952 EventType::Grant,
1953 ObjectType::Role,
1954 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1955 role_id: role_id.to_string(),
1956 member_id: member_id.to_string(),
1957 grantor_id: grantor_id.to_string(),
1958 executed_by: session
1959 .map(|session| session.authenticated_role_id())
1960 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1961 .to_string(),
1962 }),
1963 )?;
1964 }
1965 Op::RevokeRole {
1966 role_id,
1967 member_id,
1968 grantor_id,
1969 } => {
1970 state.ensure_not_reserved_role(&member_id)?;
1971 state.ensure_grantable_role(&role_id)?;
1972 let mut member_role = state.get_role(&member_id).clone();
1973 member_role.membership.map.remove(&role_id);
1974 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1975
1976 CatalogState::add_to_audit_log(
1977 &state.system_configuration,
1978 oracle_write_ts,
1979 session,
1980 tx,
1981 audit_events,
1982 EventType::Revoke,
1983 ObjectType::Role,
1984 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1985 role_id: role_id.to_string(),
1986 member_id: member_id.to_string(),
1987 grantor_id: grantor_id.to_string(),
1988 executed_by: session
1989 .map(|session| session.authenticated_role_id())
1990 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1991 .to_string(),
1992 }),
1993 )?;
1994 }
1995 Op::UpdatePrivilege {
1996 target_id,
1997 privilege,
1998 variant,
1999 } => {
2000 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2001 UpdatePrivilegeVariant::Grant => {
2002 privileges.grant(privilege);
2003 }
2004 UpdatePrivilegeVariant::Revoke => {
2005 privileges.revoke(&privilege);
2006 }
2007 };
2008 match &target_id {
2009 SystemObjectId::Object(object_id) => match object_id {
2010 ObjectId::Cluster(id) => {
2011 let mut cluster = state.get_cluster(*id).clone();
2012 update_privilege_fn(&mut cluster.privileges);
2013 tx.update_cluster(*id, cluster.into())?;
2014 }
2015 ObjectId::Database(id) => {
2016 let mut database = state.get_database(id).clone();
2017 update_privilege_fn(&mut database.privileges);
2018 tx.update_database(*id, database.into())?;
2019 }
2020 ObjectId::NetworkPolicy(id) => {
2021 let mut policy = state.get_network_policy(id).clone();
2022 update_privilege_fn(&mut policy.privileges);
2023 tx.update_network_policy(*id, policy.into())?;
2024 }
2025 ObjectId::Schema((database_spec, schema_spec)) => {
2026 let schema_id = schema_spec.clone().into();
2027 let mut schema = state
2028 .get_schema(
2029 database_spec,
2030 schema_spec,
2031 session
2032 .map(|session| session.conn_id())
2033 .unwrap_or(&SYSTEM_CONN_ID),
2034 )
2035 .clone();
2036 update_privilege_fn(&mut schema.privileges);
2037 tx.update_schema(schema_id, schema.into())?;
2038 }
2039 ObjectId::Item(id) => {
2040 let entry = state.get_entry(id);
2041 let mut new_entry = entry.clone();
2042 update_privilege_fn(&mut new_entry.privileges);
2043 if !new_entry.item().is_temporary() {
2044 tx.update_item(*id, new_entry.into())?;
2045 } else {
2046 temporary_item_updates
2047 .push((entry.clone().into(), StateDiff::Retraction));
2048 temporary_item_updates
2049 .push((new_entry.into(), StateDiff::Addition));
2050 }
2051 }
2052 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2053 },
2054 SystemObjectId::System => {
2055 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2056 update_privilege_fn(&mut system_privileges);
2057 let new_privilege =
2058 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2059 tx.set_system_privilege(
2060 privilege.grantee,
2061 privilege.grantor,
2062 new_privilege.map(|new_privilege| new_privilege.acl_mode),
2063 )?;
2064 }
2065 }
2066 let object_type = state.get_system_object_type(&target_id);
2067 let object_id_str = match &target_id {
2068 SystemObjectId::System => "SYSTEM".to_string(),
2069 SystemObjectId::Object(id) => id.to_string(),
2070 };
2071 CatalogState::add_to_audit_log(
2072 &state.system_configuration,
2073 oracle_write_ts,
2074 session,
2075 tx,
2076 audit_events,
2077 variant.into(),
2078 system_object_type_to_audit_object_type(&object_type),
2079 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2080 object_id: object_id_str,
2081 grantee_id: privilege.grantee.to_string(),
2082 grantor_id: privilege.grantor.to_string(),
2083 privileges: privilege.acl_mode.to_string(),
2084 }),
2085 )?;
2086 }
2087 Op::UpdateDefaultPrivilege {
2088 privilege_object,
2089 privilege_acl_item,
2090 variant,
2091 } => {
2092 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2093 match variant {
2094 UpdatePrivilegeVariant::Grant => default_privileges
2095 .grant(privilege_object.clone(), privilege_acl_item.clone()),
2096 UpdatePrivilegeVariant::Revoke => {
2097 default_privileges.revoke(&privilege_object, &privilege_acl_item)
2098 }
2099 }
2100 let new_acl_mode = default_privileges
2101 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2102 tx.set_default_privilege(
2103 privilege_object.role_id,
2104 privilege_object.database_id,
2105 privilege_object.schema_id,
2106 privilege_object.object_type,
2107 privilege_acl_item.grantee,
2108 new_acl_mode.cloned(),
2109 )?;
2110 CatalogState::add_to_audit_log(
2111 &state.system_configuration,
2112 oracle_write_ts,
2113 session,
2114 tx,
2115 audit_events,
2116 variant.into(),
2117 object_type_to_audit_object_type(privilege_object.object_type),
2118 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2119 role_id: privilege_object.role_id.to_string(),
2120 database_id: privilege_object.database_id.map(|id| id.to_string()),
2121 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2122 grantee_id: privilege_acl_item.grantee.to_string(),
2123 privileges: privilege_acl_item.acl_mode.to_string(),
2124 }),
2125 )?;
2126 }
2127 Op::RenameCluster {
2128 id,
2129 name,
2130 to_name,
2131 check_reserved_names,
2132 } => {
2133 if id.is_system() {
2134 return Err(AdapterError::Catalog(Error::new(
2135 ErrorKind::ReadOnlyCluster(name.clone()),
2136 )));
2137 }
2138 if check_reserved_names && is_reserved_name(&to_name) {
2139 return Err(AdapterError::Catalog(Error::new(
2140 ErrorKind::ReservedClusterName(to_name),
2141 )));
2142 }
2143 tx.rename_cluster(id, &name, &to_name)?;
2144 CatalogState::add_to_audit_log(
2145 &state.system_configuration,
2146 oracle_write_ts,
2147 session,
2148 tx,
2149 audit_events,
2150 EventType::Alter,
2151 ObjectType::Cluster,
2152 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2153 id: id.to_string(),
2154 old_name: name.clone(),
2155 new_name: to_name.clone(),
2156 }),
2157 )?;
2158 info!("rename cluster {name} to {to_name}");
2159 }
2160 Op::RenameClusterReplica {
2161 cluster_id,
2162 replica_id,
2163 name,
2164 to_name,
2165 } => {
2166 if is_reserved_name(&to_name) {
2167 return Err(AdapterError::Catalog(Error::new(
2168 ErrorKind::ReservedReplicaName(to_name),
2169 )));
2170 }
2171 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2172 CatalogState::add_to_audit_log(
2173 &state.system_configuration,
2174 oracle_write_ts,
2175 session,
2176 tx,
2177 audit_events,
2178 EventType::Alter,
2179 ObjectType::ClusterReplica,
2180 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2181 cluster_id: cluster_id.to_string(),
2182 replica_id: replica_id.to_string(),
2183 old_name: name.replica.as_str().to_string(),
2184 new_name: to_name.clone(),
2185 }),
2186 )?;
2187 info!("rename cluster replica {name} to {to_name}");
2188 }
2189 Op::RenameItem {
2190 id,
2191 to_name,
2192 current_full_name,
2193 } => {
2194 let mut updates = Vec::new();
2195
2196 let entry = state.get_entry(&id);
2197 if let CatalogItem::Type(_) = entry.item() {
2198 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2199 current_full_name.to_string(),
2200 ))));
2201 }
2202
2203 if entry.id().is_system() {
2204 let name = state
2205 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2206 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2207 name.to_string(),
2208 ))));
2209 }
2210
2211 let mut to_full_name = current_full_name.clone();
2212 to_full_name.item.clone_from(&to_name);
2213
2214 let mut to_qualified_name = entry.name().clone();
2215 to_qualified_name.item.clone_from(&to_name);
2216
2217 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2218 id: id.to_string(),
2219 old_name: Self::full_name_detail(¤t_full_name),
2220 new_name: Self::full_name_detail(&to_full_name),
2221 });
2222 if Self::should_audit_log_item(entry.item()) {
2223 CatalogState::add_to_audit_log(
2224 &state.system_configuration,
2225 oracle_write_ts,
2226 session,
2227 tx,
2228 audit_events,
2229 EventType::Alter,
2230 catalog_type_to_audit_object_type(entry.item().typ()),
2231 details,
2232 )?;
2233 }
2234
2235 let mut new_entry = entry.clone();
2237 new_entry.name.item.clone_from(&to_name);
2238 new_entry.item = entry
2239 .item()
2240 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2241 .map_err(|e| {
2242 Error::new(ErrorKind::from(AmbiguousRename {
2243 depender: state
2244 .resolve_full_name(entry.name(), entry.conn_id())
2245 .to_string(),
2246 dependee: state
2247 .resolve_full_name(entry.name(), entry.conn_id())
2248 .to_string(),
2249 message: e,
2250 }))
2251 })?;
2252
2253 for id in entry.referenced_by() {
2254 let dependent_item = state.get_entry(id);
2255 let mut to_entry = dependent_item.clone();
2256 to_entry.item = dependent_item
2257 .item()
2258 .rename_item_refs(
2259 current_full_name.clone(),
2260 to_full_name.item.clone(),
2261 false,
2262 )
2263 .map_err(|e| {
2264 Error::new(ErrorKind::from(AmbiguousRename {
2265 depender: state
2266 .resolve_full_name(
2267 dependent_item.name(),
2268 dependent_item.conn_id(),
2269 )
2270 .to_string(),
2271 dependee: state
2272 .resolve_full_name(entry.name(), entry.conn_id())
2273 .to_string(),
2274 message: e,
2275 }))
2276 })?;
2277
2278 if !to_entry.item().is_temporary() {
2279 tx.update_item(*id, to_entry.into())?;
2280 } else {
2281 temporary_item_updates
2282 .push((dependent_item.clone().into(), StateDiff::Retraction));
2283 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2284 }
2285 updates.push(*id);
2286 }
2287 if !new_entry.item().is_temporary() {
2288 tx.update_item(id, new_entry.into())?;
2289 } else {
2290 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2291 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2292 }
2293
2294 updates.push(id);
2295 for id in updates {
2296 Self::log_update(state, &id);
2297 }
2298 }
2299 Op::RenameSchema {
2300 database_spec,
2301 schema_spec,
2302 new_name,
2303 check_reserved_names,
2304 } => {
2305 if check_reserved_names && is_reserved_name(&new_name) {
2306 return Err(AdapterError::Catalog(Error::new(
2307 ErrorKind::ReservedSchemaName(new_name),
2308 )));
2309 }
2310
2311 let conn_id = session
2312 .map(|session| session.conn_id())
2313 .unwrap_or(&SYSTEM_CONN_ID);
2314
2315 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2316 let cur_name = schema.name().schema.clone();
2317
2318 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2319 return Err(AdapterError::Catalog(Error::new(
2320 ErrorKind::AmbientSchemaRename(cur_name),
2321 )));
2322 };
2323 let database = state.get_database(&database_id);
2324 let database_name = &database.name;
2325
2326 let mut updates = Vec::new();
2327 let mut items_to_update = BTreeMap::new();
2328
2329 let mut update_item = |id| {
2330 if items_to_update.contains_key(id) {
2331 return Ok(());
2332 }
2333
2334 let entry = state.get_entry(id);
2335
2336 let mut new_entry = entry.clone();
2338 new_entry.item = entry
2339 .item
2340 .rename_schema_refs(database_name, &cur_name, &new_name)
2341 .map_err(|(s, _i)| {
2342 Error::new(ErrorKind::from(AmbiguousRename {
2343 depender: state
2344 .resolve_full_name(entry.name(), entry.conn_id())
2345 .to_string(),
2346 dependee: format!("{database_name}.{cur_name}"),
2347 message: format!("ambiguous reference to schema named {s}"),
2348 }))
2349 })?;
2350
2351 if !new_entry.item().is_temporary() {
2353 items_to_update.insert(*id, new_entry.into());
2354 } else {
2355 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2356 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2357 }
2358 updates.push(id);
2359
2360 Ok::<_, AdapterError>(())
2361 };
2362
2363 for (_name, item_id) in &schema.items {
2365 update_item(item_id)?;
2367
2368 for id in state.get_entry(item_id).referenced_by() {
2370 update_item(id)?;
2371 }
2372 }
2373 tx.update_items(items_to_update)?;
2376
2377 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2379 let schema_name = schema.name().schema.clone();
2380 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2381 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2382 )));
2383 };
2384
2385 let database_name = database_spec
2387 .id()
2388 .map(|id| state.get_database(&id).name.clone());
2389 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2390 id: schema_id.to_string(),
2391 old_name: schema.name().schema.clone(),
2392 new_name: new_name.clone(),
2393 database_name,
2394 });
2395 CatalogState::add_to_audit_log(
2396 &state.system_configuration,
2397 oracle_write_ts,
2398 session,
2399 tx,
2400 audit_events,
2401 EventType::Alter,
2402 mz_audit_log::ObjectType::Schema,
2403 details,
2404 )?;
2405
2406 let mut new_schema = schema.clone();
2408 new_schema.name.schema.clone_from(&new_name);
2409 tx.update_schema(schema_id, new_schema.into())?;
2410
2411 for id in updates {
2412 Self::log_update(state, id);
2413 }
2414 }
2415 Op::UpdateOwner { id, new_owner } => {
2416 let conn_id = session
2417 .map(|session| session.conn_id())
2418 .unwrap_or(&SYSTEM_CONN_ID);
2419 let old_owner = state
2420 .get_owner_id(&id, conn_id)
2421 .expect("cannot update the owner of an object without an owner");
2422 match &id {
2423 ObjectId::Cluster(id) => {
2424 let mut cluster = state.get_cluster(*id).clone();
2425 if id.is_system() {
2426 return Err(AdapterError::Catalog(Error::new(
2427 ErrorKind::ReadOnlyCluster(cluster.name),
2428 )));
2429 }
2430 Self::update_privilege_owners(
2431 &mut cluster.privileges,
2432 cluster.owner_id,
2433 new_owner,
2434 );
2435 cluster.owner_id = new_owner;
2436 tx.update_cluster(*id, cluster.into())?;
2437 }
2438 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2439 let cluster = state.get_cluster(*cluster_id);
2440 let mut replica = cluster
2441 .replica(*replica_id)
2442 .expect("catalog out of sync")
2443 .clone();
2444 if replica_id.is_system() {
2445 return Err(AdapterError::Catalog(Error::new(
2446 ErrorKind::ReadOnlyClusterReplica(replica.name),
2447 )));
2448 }
2449 replica.owner_id = new_owner;
2450 tx.update_cluster_replica(*replica_id, replica.into())?;
2451 }
2452 ObjectId::Database(id) => {
2453 let mut database = state.get_database(id).clone();
2454 if id.is_system() {
2455 return Err(AdapterError::Catalog(Error::new(
2456 ErrorKind::ReadOnlyDatabase(database.name),
2457 )));
2458 }
2459 Self::update_privilege_owners(
2460 &mut database.privileges,
2461 database.owner_id,
2462 new_owner,
2463 );
2464 database.owner_id = new_owner;
2465 tx.update_database(*id, database.clone().into())?;
2466 }
2467 ObjectId::Schema((database_spec, schema_spec)) => {
2468 let schema_id: SchemaId = schema_spec.clone().into();
2469 let mut schema = state
2470 .get_schema(database_spec, schema_spec, conn_id)
2471 .clone();
2472 if schema_id.is_system() {
2473 let name = schema.name();
2474 let full_name = state.resolve_full_schema_name(name);
2475 return Err(AdapterError::Catalog(Error::new(
2476 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2477 )));
2478 }
2479 Self::update_privilege_owners(
2480 &mut schema.privileges,
2481 schema.owner_id,
2482 new_owner,
2483 );
2484 schema.owner_id = new_owner;
2485 tx.update_schema(schema_id, schema.into())?;
2486 }
2487 ObjectId::Item(id) => {
2488 let entry = state.get_entry(id);
2489 let mut new_entry = entry.clone();
2490 if id.is_system() {
2491 let full_name = state.resolve_full_name(
2492 new_entry.name(),
2493 session.map(|session| session.conn_id()),
2494 );
2495 return Err(AdapterError::Catalog(Error::new(
2496 ErrorKind::ReadOnlyItem(full_name.to_string()),
2497 )));
2498 }
2499 Self::update_privilege_owners(
2500 &mut new_entry.privileges,
2501 new_entry.owner_id,
2502 new_owner,
2503 );
2504 new_entry.owner_id = new_owner;
2505 if !new_entry.item().is_temporary() {
2506 tx.update_item(*id, new_entry.into())?;
2507 } else {
2508 temporary_item_updates
2509 .push((entry.clone().into(), StateDiff::Retraction));
2510 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2511 }
2512 }
2513 ObjectId::NetworkPolicy(id) => {
2514 let mut policy = state.get_network_policy(id).clone();
2515 if id.is_system() {
2516 return Err(AdapterError::Catalog(Error::new(
2517 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2518 )));
2519 }
2520 Self::update_privilege_owners(
2521 &mut policy.privileges,
2522 policy.owner_id,
2523 new_owner,
2524 );
2525 policy.owner_id = new_owner;
2526 tx.update_network_policy(*id, policy.into())?;
2527 }
2528 ObjectId::Role(_) => unreachable!("roles have no owner"),
2529 }
2530 let object_type = state.get_object_type(&id);
2531 CatalogState::add_to_audit_log(
2532 &state.system_configuration,
2533 oracle_write_ts,
2534 session,
2535 tx,
2536 audit_events,
2537 EventType::Alter,
2538 object_type_to_audit_object_type(object_type),
2539 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2540 object_id: id.to_string(),
2541 old_owner_id: old_owner.to_string(),
2542 new_owner_id: new_owner.to_string(),
2543 }),
2544 )?;
2545 }
2546 Op::UpdateClusterConfig { id, name, config } => {
2547 let mut cluster = state.get_cluster(id).clone();
2548 cluster.config = config;
2549 tx.update_cluster(id, cluster.into())?;
2550 info!("update cluster {}", name);
2551
2552 CatalogState::add_to_audit_log(
2553 &state.system_configuration,
2554 oracle_write_ts,
2555 session,
2556 tx,
2557 audit_events,
2558 EventType::Alter,
2559 ObjectType::Cluster,
2560 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2561 id: id.to_string(),
2562 name,
2563 }),
2564 )?;
2565 }
2566 Op::UpdateClusterReplicaConfig {
2567 replica_id,
2568 cluster_id,
2569 config,
2570 } => {
2571 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2572 info!("update replica {}", replica.name);
2573 tx.update_cluster_replica(
2574 replica_id,
2575 mz_catalog::durable::ClusterReplica {
2576 cluster_id,
2577 replica_id,
2578 name: replica.name.clone(),
2579 config: config.clone().into(),
2580 owner_id: replica.owner_id,
2581 },
2582 )?;
2583 }
2584 Op::UpdateItem { id, name, to_item } => {
2585 let mut entry = state.get_entry(&id).clone();
2586 entry.name = name.clone();
2587 entry.item = to_item.clone();
2588 tx.update_item(id, entry.into())?;
2589
2590 if Self::should_audit_log_item(&to_item) {
2591 let mut full_name = Self::full_name_detail(
2592 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2593 );
2594 full_name.item = name.item;
2595
2596 CatalogState::add_to_audit_log(
2597 &state.system_configuration,
2598 oracle_write_ts,
2599 session,
2600 tx,
2601 audit_events,
2602 EventType::Alter,
2603 catalog_type_to_audit_object_type(to_item.typ()),
2604 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2605 id: id.to_string(),
2606 name: full_name,
2607 }),
2608 )?;
2609 }
2610
2611 Self::log_update(state, &id);
2612 }
2613 Op::UpdateSystemConfiguration { name, value } => {
2614 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2615 tx.upsert_system_config(&name, parsed_value.clone())?;
2616 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2621 let with_0dt_deployment_max_wait =
2622 Duration::parse(VarInput::Flat(&parsed_value))
2623 .expect("parsing succeeded above");
2624 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2625 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2626 let with_0dt_deployment_ddl_check_interval =
2627 Duration::parse(VarInput::Flat(&parsed_value))
2628 .expect("parsing succeeded above");
2629 tx.set_0dt_deployment_ddl_check_interval(
2630 with_0dt_deployment_ddl_check_interval,
2631 )?;
2632 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2633 let panic_after_timeout =
2634 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2635 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2636 }
2637
2638 CatalogState::add_to_audit_log(
2639 &state.system_configuration,
2640 oracle_write_ts,
2641 session,
2642 tx,
2643 audit_events,
2644 EventType::Alter,
2645 ObjectType::System,
2646 EventDetails::SetV1(mz_audit_log::SetV1 {
2647 name,
2648 value: Some(value.borrow().to_vec().join(", ")),
2649 }),
2650 )?;
2651 }
2652 Op::ResetSystemConfiguration { name } => {
2653 tx.remove_system_config(&name);
2654 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2659 tx.reset_0dt_deployment_max_wait()?;
2660 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2661 tx.reset_0dt_deployment_ddl_check_interval()?;
2662 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2663 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2664 }
2665
2666 CatalogState::add_to_audit_log(
2667 &state.system_configuration,
2668 oracle_write_ts,
2669 session,
2670 tx,
2671 audit_events,
2672 EventType::Alter,
2673 ObjectType::System,
2674 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2675 )?;
2676 }
2677 Op::ResetAllSystemConfiguration => {
2678 tx.clear_system_configs();
2679 tx.reset_0dt_deployment_max_wait()?;
2680 tx.reset_0dt_deployment_ddl_check_interval()?;
2681 tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2682
2683 CatalogState::add_to_audit_log(
2684 &state.system_configuration,
2685 oracle_write_ts,
2686 session,
2687 tx,
2688 audit_events,
2689 EventType::Alter,
2690 ObjectType::System,
2691 EventDetails::ResetAllV1,
2692 )?;
2693 }
2694 Op::InjectAuditEvents { events } => {
2695 for event in events {
2696 let id = tx.allocate_audit_log_id()?;
2697 let ev = VersionedEvent::new(
2698 id,
2699 event.event_type,
2700 event.object_type,
2701 event.details,
2702 event.user,
2703 oracle_write_ts.into(),
2704 );
2705 audit_events.push(ev.clone());
2706 tx.insert_audit_log_event(ev);
2707 }
2708 }
2709 };
2710 Ok(temporary_item_updates)
2711 }
2712
2713 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2714 let entry = state.get_entry(id);
2715 info!(
2716 "update {} {} ({})",
2717 entry.item_type(),
2718 state.resolve_full_name(entry.name(), entry.conn_id()),
2719 id
2720 );
2721 }
2722
2723 fn update_privilege_owners(
2727 privileges: &mut PrivilegeMap,
2728 old_owner: RoleId,
2729 new_owner: RoleId,
2730 ) {
2731 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2733
2734 let mut new_present = false;
2735 for privilege in flat_privileges.iter_mut() {
2736 if privilege.grantor == old_owner {
2739 privilege.grantor = new_owner;
2740 } else if privilege.grantor == new_owner {
2741 new_present = true;
2742 }
2743 if privilege.grantee == old_owner {
2745 privilege.grantee = new_owner;
2746 } else if privilege.grantee == new_owner {
2747 new_present = true;
2748 }
2749 }
2750
2751 if new_present {
2755 let privilege_map: BTreeMap<_, Vec<_>> =
2757 flat_privileges
2758 .into_iter()
2759 .fold(BTreeMap::new(), |mut accum, privilege| {
2760 accum
2761 .entry((privilege.grantee, privilege.grantor))
2762 .or_default()
2763 .push(privilege);
2764 accum
2765 });
2766
2767 flat_privileges = privilege_map
2769 .into_iter()
2770 .map(|((grantee, grantor), values)|
2771 values.into_iter().fold(
2773 MzAclItem::empty(grantee, grantor),
2774 |mut accum, mz_aclitem| {
2775 accum.acl_mode =
2776 accum.acl_mode.union(mz_aclitem.acl_mode);
2777 accum
2778 },
2779 ))
2780 .collect();
2781 }
2782
2783 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2784 }
2785}
2786
2787fn tx_replace_item(
2796 tx: &mut Transaction<'_>,
2797 state: &CatalogState,
2798 id: CatalogItemId,
2799 new_entry: CatalogEntry,
2800) -> Result<(), AdapterError> {
2801 let new_id = new_entry.id;
2802
2803 for use_id in new_entry.referenced_by() {
2805 if tx.get_item(use_id).is_none() {
2807 continue;
2808 }
2809
2810 let mut dependent = state.get_entry(use_id).clone();
2811 dependent.item = dependent.item.replace_item_refs(id, new_id);
2812 tx.update_item(*use_id, dependent.into())?;
2813 }
2814
2815 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2817 let new_comment_id = new_entry.comment_object_id();
2818 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2819 tx.drop_comments(&[old_comment_id].into())?;
2820 for (sub, comment) in comments {
2821 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2822 }
2823 }
2824
2825 let mz_catalog::durable::Item {
2826 id: _,
2827 oid,
2828 global_id,
2829 schema_id,
2830 name,
2831 create_sql,
2832 owner_id,
2833 privileges,
2834 extra_versions,
2835 } = new_entry.into();
2836
2837 tx.remove_item(id)?;
2838 tx.insert_item(
2839 new_id,
2840 oid,
2841 global_id,
2842 schema_id,
2843 &name,
2844 create_sql,
2845 owner_id,
2846 privileges,
2847 extra_versions,
2848 )?;
2849
2850 Ok(())
2851}
2852
2853fn apply_replacement_audit_events(
2855 state: &CatalogState,
2856 target: &CatalogEntry,
2857 replacement: &CatalogEntry,
2858) -> Vec<(EventType, EventDetails)> {
2859 let mut events = Vec::new();
2860
2861 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2862 let target_id_name = IdFullNameV1 {
2863 id: target.id().to_string(),
2864 name: Catalog::full_name_detail(&target_name),
2865 };
2866 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2867 let replacement_id_name = IdFullNameV1 {
2868 id: replacement.id().to_string(),
2869 name: Catalog::full_name_detail(&replacement_name),
2870 };
2871
2872 if Catalog::should_audit_log_item(&replacement.item) {
2873 events.push((
2874 EventType::Drop,
2875 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2876 ));
2877 }
2878
2879 if Catalog::should_audit_log_item(&target.item) {
2880 events.push((
2881 EventType::Alter,
2882 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2883 target: target_id_name.clone(),
2884 replacement: replacement_id_name,
2885 }),
2886 ));
2887
2888 if let Some(old_cluster_id) = target.cluster_id()
2889 && let Some(new_cluster_id) = replacement.cluster_id()
2890 && old_cluster_id != new_cluster_id
2891 {
2892 events.push((
2895 EventType::Alter,
2896 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2897 id: replacement.id().to_string(),
2898 name: target_id_name.name,
2899 old_cluster_id: old_cluster_id.to_string(),
2900 new_cluster_id: new_cluster_id.to_string(),
2901 }),
2902 ));
2903 }
2904 }
2905
2906 events
2907}
2908
2909#[derive(Debug, Default)]
2917pub(crate) struct ObjectsToDrop {
2918 pub comments: BTreeSet<CommentObjectId>,
2919 pub databases: BTreeSet<DatabaseId>,
2920 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2921 pub clusters: BTreeSet<ClusterId>,
2922 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2923 pub roles: BTreeSet<RoleId>,
2924 pub items: Vec<CatalogItemId>,
2925 pub network_policies: BTreeSet<NetworkPolicyId>,
2926}
2927
2928impl ObjectsToDrop {
2929 pub fn generate(
2930 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2931 state: &CatalogState,
2932 session: Option<&ConnMeta>,
2933 ) -> Result<Self, AdapterError> {
2934 let mut delta = ObjectsToDrop::default();
2935
2936 for drop_object_info in drop_object_infos {
2937 delta.add_item(drop_object_info, state, session)?;
2938 }
2939
2940 Ok(delta)
2941 }
2942
2943 fn add_item(
2944 &mut self,
2945 drop_object_info: DropObjectInfo,
2946 state: &CatalogState,
2947 session: Option<&ConnMeta>,
2948 ) -> Result<(), AdapterError> {
2949 self.comments
2950 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2951
2952 match drop_object_info {
2953 DropObjectInfo::Database(database_id) => {
2954 let database = &state.database_by_id[&database_id];
2955 if database_id.is_system() {
2956 return Err(AdapterError::Catalog(Error::new(
2957 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2958 )));
2959 }
2960
2961 self.databases.insert(database_id);
2962 }
2963 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2964 let schema = state.get_schema(
2965 &database_spec,
2966 &schema_spec,
2967 session
2968 .map(|session| session.conn_id())
2969 .unwrap_or(&SYSTEM_CONN_ID),
2970 );
2971 let schema_id: SchemaId = schema_spec.into();
2972 if schema_id.is_system() {
2973 let name = schema.name();
2974 let full_name = state.resolve_full_schema_name(name);
2975 return Err(AdapterError::Catalog(Error::new(
2976 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2977 )));
2978 }
2979
2980 self.schemas.insert(schema_spec, database_spec);
2981 }
2982 DropObjectInfo::Role(role_id) => {
2983 let name = state.get_role(&role_id).name().to_string();
2984 if role_id.is_system() || role_id.is_predefined() {
2985 return Err(AdapterError::Catalog(Error::new(
2986 ErrorKind::ReservedRoleName(name.clone()),
2987 )));
2988 }
2989 state.ensure_not_reserved_role(&role_id)?;
2990
2991 self.roles.insert(role_id);
2992 }
2993 DropObjectInfo::Cluster(cluster_id) => {
2994 let cluster = state.get_cluster(cluster_id);
2995 let name = &cluster.name;
2996 if cluster_id.is_system() {
2997 return Err(AdapterError::Catalog(Error::new(
2998 ErrorKind::ReadOnlyCluster(name.clone()),
2999 )));
3000 }
3001
3002 self.clusters.insert(cluster_id);
3003 }
3004 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3005 let cluster = state.get_cluster(cluster_id);
3006 let replica = cluster.replica(replica_id).expect("Must exist");
3007
3008 self.replicas
3009 .insert(replica.replica_id, (cluster.id, reason));
3010
3011 let mut seen: BTreeSet<ObjectId> =
3022 self.items.iter().copied().map(ObjectId::Item).collect();
3023 for item_id in &cluster.bound_objects {
3024 let entry = state.get_entry(item_id);
3025 if let CatalogItem::MaterializedView(mv) = entry.item()
3026 && mv.target_replica == Some(replica_id)
3027 && !seen.contains(&ObjectId::Item(*item_id))
3028 {
3029 info!(
3030 "implicitly dropping materialized view {} because target replica was dropped",
3031 entry.name().item,
3032 );
3033 for dep in state.item_dependents(*item_id, &mut seen) {
3034 if let ObjectId::Item(dep_id) = dep {
3035 self.items.push(dep_id);
3036 }
3037 }
3038 }
3039 }
3040 }
3041 DropObjectInfo::Item(item_id) => {
3042 let entry = state.get_entry(&item_id);
3043 if item_id.is_system() {
3044 let name = entry.name();
3045 let full_name =
3046 state.resolve_full_name(name, session.map(|session| session.conn_id()));
3047 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3048 full_name.to_string(),
3049 ))));
3050 }
3051
3052 self.items.push(item_id);
3053 }
3054 DropObjectInfo::NetworkPolicy(network_policy_id) => {
3055 let policy = state.get_network_policy(&network_policy_id);
3056 let name = &policy.name;
3057 if network_policy_id.is_system() {
3058 return Err(AdapterError::Catalog(Error::new(
3059 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3060 )));
3061 }
3062
3063 self.network_policies.insert(network_policy_id);
3064 }
3065 }
3066
3067 Ok(())
3068 }
3069}
3070
3071#[cfg(test)]
3072mod tests {
3073 use mz_catalog::SYSTEM_CONN_ID;
3074 use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3075 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3076 use mz_repr::role_id::RoleId;
3077 use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3078 use mz_sql::DEFAULT_SCHEMA;
3079 use mz_sql::catalog::CatalogDatabase;
3080 use mz_sql::names::{
3081 ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3082 };
3083 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3084
3085 use crate::catalog::{Catalog, Op};
3086 use crate::session::DEFAULT_DATABASE_NAME;
3087
3088 #[mz_ore::test]
3089 fn test_update_privilege_owners() {
3090 let old_owner = RoleId::User(1);
3091 let new_owner = RoleId::User(2);
3092 let other_role = RoleId::User(3);
3093
3094 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3096 MzAclItem {
3097 grantee: other_role,
3098 grantor: old_owner,
3099 acl_mode: AclMode::UPDATE,
3100 },
3101 MzAclItem {
3102 grantee: other_role,
3103 grantor: new_owner,
3104 acl_mode: AclMode::SELECT,
3105 },
3106 ]);
3107 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3108 assert_eq!(1, privileges.all_values().count());
3109 assert_eq!(
3110 vec![MzAclItem {
3111 grantee: other_role,
3112 grantor: new_owner,
3113 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3114 }],
3115 privileges.all_values_owned().collect::<Vec<_>>()
3116 );
3117
3118 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3120 MzAclItem {
3121 grantee: old_owner,
3122 grantor: other_role,
3123 acl_mode: AclMode::UPDATE,
3124 },
3125 MzAclItem {
3126 grantee: new_owner,
3127 grantor: other_role,
3128 acl_mode: AclMode::SELECT,
3129 },
3130 ]);
3131 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3132 assert_eq!(1, privileges.all_values().count());
3133 assert_eq!(
3134 vec![MzAclItem {
3135 grantee: new_owner,
3136 grantor: other_role,
3137 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3138 }],
3139 privileges.all_values_owned().collect::<Vec<_>>()
3140 );
3141
3142 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3144 MzAclItem {
3145 grantee: old_owner,
3146 grantor: old_owner,
3147 acl_mode: AclMode::UPDATE,
3148 },
3149 MzAclItem {
3150 grantee: new_owner,
3151 grantor: new_owner,
3152 acl_mode: AclMode::SELECT,
3153 },
3154 ]);
3155 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3156 assert_eq!(1, privileges.all_values().count());
3157 assert_eq!(
3158 vec![MzAclItem {
3159 grantee: new_owner,
3160 grantor: new_owner,
3161 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3162 }],
3163 privileges.all_values_owned().collect::<Vec<_>>()
3164 );
3165 }
3166
3167 #[mz_ore::test(tokio::test)]
3174 #[cfg_attr(miri, ignore)] async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3176 Catalog::with_debug(|catalog| async move {
3177 let database = catalog
3179 .resolve_database(DEFAULT_DATABASE_NAME)
3180 .expect("default database");
3181 let database_name = database.name.clone();
3182 let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3183 let schema = catalog
3184 .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3185 .expect("default schema");
3186 let schema_name = schema.name.schema.clone();
3187 let schema_spec = schema.id.clone();
3188
3189 let (id_t1, global_id_t1) = catalog
3191 .allocate_user_id_for_test()
3192 .await
3193 .expect("allocate id for t1");
3194 let (id_t2, global_id_t2) = catalog
3195 .allocate_user_id_for_test()
3196 .await
3197 .expect("allocate id for t2");
3198
3199 let oracle_write_ts = catalog.current_upper().await;
3200
3201 let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3203 id,
3204 name: QualifiedItemName {
3205 qualifiers: ItemQualifiers {
3206 database_spec: database_spec.clone(),
3207 schema_spec: schema_spec.clone(),
3208 },
3209 item: name.to_string(),
3210 },
3211 item: CatalogItem::Table(Table {
3212 create_sql: Some(format!(
3213 "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3214 )),
3215 desc: VersionedRelationDesc::new(RelationDesc::empty()),
3216 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3217 conn_id: None,
3218 resolved_ids: ResolvedIds::empty(),
3219 custom_logical_compaction_window: None,
3220 is_retained_metrics_object: false,
3221 data_source: TableDataSource::TableWrites { defaults: vec![] },
3222 }),
3223 owner_id: MZ_SYSTEM_ROLE_ID,
3224 };
3225
3226 let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3227 let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3228
3229 let base_state = catalog.state().clone();
3230
3231 let (state_after_t1, snapshot_after_t1) = catalog
3235 .transact_incremental_dry_run(
3236 &base_state,
3237 vec![op_t1.clone()],
3238 None,
3239 None,
3240 oracle_write_ts,
3241 )
3242 .await
3243 .expect("first dry run");
3244
3245 assert!(
3247 state_after_t1.try_get_entry(&id_t1).is_some(),
3248 "t1 should exist after first dry run"
3249 );
3250 assert_eq!(
3251 state_after_t1
3252 .try_get_entry(&id_t1)
3253 .expect("t1 entry")
3254 .name()
3255 .item,
3256 "t1"
3257 );
3258 assert!(
3259 state_after_t1.try_get_entry(&id_t2).is_none(),
3260 "t2 should NOT exist after first dry run"
3261 );
3262
3263 let (state_incremental, _) = catalog
3265 .transact_incremental_dry_run(
3266 &state_after_t1,
3267 vec![op_t2.clone()],
3268 None,
3269 Some(snapshot_after_t1),
3270 oracle_write_ts,
3271 )
3272 .await
3273 .expect("second dry run");
3274
3275 assert!(
3277 state_incremental.try_get_entry(&id_t1).is_some(),
3278 "t1 should exist in incremental result"
3279 );
3280 assert!(
3281 state_incremental.try_get_entry(&id_t2).is_some(),
3282 "t2 should exist in incremental result"
3283 );
3284
3285 let (state_all_at_once, _) = catalog
3288 .transact_incremental_dry_run(
3289 &base_state,
3290 vec![op_t1.clone(), op_t2.clone()],
3291 None,
3292 None,
3293 oracle_write_ts,
3294 )
3295 .await
3296 .expect("all-at-once dry run");
3297
3298 assert!(
3299 state_all_at_once.try_get_entry(&id_t1).is_some(),
3300 "t1 should exist in all-at-once result"
3301 );
3302 assert!(
3303 state_all_at_once.try_get_entry(&id_t2).is_some(),
3304 "t2 should exist in all-at-once result"
3305 );
3306
3307 let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3310 let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3311 assert_eq!(inc_t1.name(), all_t1.name());
3312 assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3313
3314 let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3315 let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3316 assert_eq!(inc_t2.name(), all_t2.name());
3317 assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3318
3319 catalog.expire().await;
3320 })
3321 .await
3322 }
3323}