1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50 AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
51 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
52 DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
53 RoleVars,
54};
55use mz_sql::names::{
56 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
57 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{NetworkPolicyRule, PlanError};
60use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
61use mz_sql::session::vars::OwnedVarInput;
62use mz_sql::session::vars::{Value as VarValue, VarInput};
63use mz_sql::{DEFAULT_SCHEMA, rbac};
64use mz_sql_parser::ast::{QualifiedReplica, Value};
65use mz_storage_client::storage_collections::StorageCollections;
66use serde::{Deserialize, Serialize};
67use tracing::{info, trace};
68
69use crate::AdapterError;
70use crate::catalog::state::LocalExpressionCache;
71use crate::catalog::{
72 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
73 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
74 is_reserved_role_name, object_type_to_audit_object_type,
75 system_object_type_to_audit_object_type,
76};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88 pub event_type: EventType,
89 pub object_type: ObjectType,
90 pub details: EventDetails,
91 pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96 AlterRetainHistory {
97 id: CatalogItemId,
98 value: Option<Value>,
99 window: CompactionWindow,
100 },
101 AlterSourceTimestampInterval {
102 id: CatalogItemId,
103 value: Option<Value>,
104 interval: Duration,
105 },
106 AlterRole {
107 id: RoleId,
108 name: String,
109 attributes: RoleAttributesRaw,
110 nopassword: bool,
111 vars: RoleVars,
112 },
113 AlterNetworkPolicy {
114 id: NetworkPolicyId,
115 rules: Vec<NetworkPolicyRule>,
116 name: String,
117 owner_id: RoleId,
118 },
119 AlterAddColumn {
120 id: CatalogItemId,
121 new_global_id: GlobalId,
122 name: ColumnName,
123 typ: SqlColumnType,
124 sql: RawDataType,
125 },
126 AlterMaterializedViewApplyReplacement {
127 id: CatalogItemId,
128 replacement_id: CatalogItemId,
129 },
130 CreateDatabase {
131 name: String,
132 owner_id: RoleId,
133 },
134 CreateSchema {
135 database_id: ResolvedDatabaseSpecifier,
136 schema_name: String,
137 owner_id: RoleId,
138 },
139 CreateRole {
140 name: String,
141 attributes: RoleAttributesRaw,
142 },
143 CreateCluster {
144 id: ClusterId,
145 name: String,
146 introspection_sources: Vec<&'static BuiltinLog>,
147 owner_id: RoleId,
148 config: ClusterConfig,
149 },
150 CreateClusterReplica {
151 cluster_id: ClusterId,
152 name: String,
153 config: ReplicaConfig,
154 owner_id: RoleId,
155 reason: ReplicaCreateDropReason,
156 },
157 CreateItem {
158 id: CatalogItemId,
159 name: QualifiedItemName,
160 item: CatalogItem,
161 owner_id: RoleId,
162 },
163 CreateNetworkPolicy {
164 rules: Vec<NetworkPolicyRule>,
165 name: String,
166 owner_id: RoleId,
167 },
168 Comment {
169 object_id: CommentObjectId,
170 sub_component: Option<usize>,
171 comment: Option<String>,
172 },
173 DropObjects(Vec<DropObjectInfo>),
174 GrantRole {
175 role_id: RoleId,
176 member_id: RoleId,
177 grantor_id: RoleId,
178 },
179 RenameCluster {
180 id: ClusterId,
181 name: String,
182 to_name: String,
183 check_reserved_names: bool,
184 },
185 RenameClusterReplica {
186 cluster_id: ClusterId,
187 replica_id: ReplicaId,
188 name: QualifiedReplica,
189 to_name: String,
190 },
191 RenameItem {
192 id: CatalogItemId,
193 current_full_name: FullItemName,
194 to_name: String,
195 },
196 RenameSchema {
197 database_spec: ResolvedDatabaseSpecifier,
198 schema_spec: SchemaSpecifier,
199 new_name: String,
200 check_reserved_names: bool,
201 },
202 UpdateOwner {
203 id: ObjectId,
204 new_owner: RoleId,
205 },
206 UpdatePrivilege {
207 target_id: SystemObjectId,
208 privilege: MzAclItem,
209 variant: UpdatePrivilegeVariant,
210 },
211 UpdateDefaultPrivilege {
212 privilege_object: DefaultPrivilegeObject,
213 privilege_acl_item: DefaultPrivilegeAclItem,
214 variant: UpdatePrivilegeVariant,
215 },
216 RevokeRole {
217 role_id: RoleId,
218 member_id: RoleId,
219 grantor_id: RoleId,
220 },
221 UpdateClusterConfig {
222 id: ClusterId,
223 name: String,
224 config: ClusterConfig,
225 },
226 UpdateClusterReplicaConfig {
227 cluster_id: ClusterId,
228 replica_id: ReplicaId,
229 config: ReplicaConfig,
230 },
231 UpdateItem {
232 id: CatalogItemId,
233 name: QualifiedItemName,
234 to_item: CatalogItem,
235 },
236 UpdateSourceReferences {
237 source_id: CatalogItemId,
238 references: SourceReferences,
239 },
240 UpdateSystemConfiguration {
241 name: String,
242 value: OwnedVarInput,
243 },
244 ResetSystemConfiguration {
245 name: String,
246 },
247 ResetAllSystemConfiguration,
248 WeirdStorageUsageUpdates {
255 object_id: Option<String>,
256 size_bytes: u64,
257 collection_timestamp: EpochMillis,
258 },
259 InjectAuditEvents {
265 events: Vec<InjectedAuditEvent>,
266 },
267}
268
269#[derive(Debug, Clone)]
273pub enum DropObjectInfo {
274 Cluster(ClusterId),
275 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
276 Database(DatabaseId),
277 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
278 Role(RoleId),
279 Item(CatalogItemId),
280 NetworkPolicy(NetworkPolicyId),
281}
282
283impl DropObjectInfo {
284 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
287 match id {
288 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
289 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
290 cluster_id,
291 replica_id,
292 ReplicaCreateDropReason::Manual,
293 )),
294 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
295 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
296 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
297 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
298 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
299 }
300 }
301
302 fn to_object_id(&self) -> ObjectId {
305 match &self {
306 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
307 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
308 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
309 }
310 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
311 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
312 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
313 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
314 DropObjectInfo::NetworkPolicy(network_policy_id) => {
315 ObjectId::NetworkPolicy(network_policy_id.clone())
316 }
317 }
318 }
319}
320
321#[derive(Debug, Clone)]
323pub enum ReplicaCreateDropReason {
324 Manual,
329 ClusterScheduling(Vec<SchedulingDecision>),
332}
333
334impl ReplicaCreateDropReason {
335 pub fn into_audit_log(
336 self,
337 ) -> (
338 CreateOrDropClusterReplicaReasonV1,
339 Option<SchedulingDecisionsWithReasonsV2>,
340 ) {
341 let (reason, scheduling_policies) = match self {
342 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
343 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
344 CreateOrDropClusterReplicaReasonV1::Schedule,
345 Some(scheduling_decisions),
346 ),
347 };
348 (
349 reason,
350 scheduling_policies
351 .as_ref()
352 .map(SchedulingDecision::reasons_to_audit_log_reasons),
353 )
354 }
355}
356
357pub struct TransactionResult {
358 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
359 pub catalog_updates: Vec<ParsedStateUpdate>,
361 pub audit_events: Vec<VersionedEvent>,
362}
363
364#[derive(Debug, Clone, Copy)]
365enum TransactInnerMode {
366 Commit,
370 DryRun,
377}
378
379impl Catalog {
380 fn should_audit_log_item(item: &CatalogItem) -> bool {
381 !item.is_temporary()
382 }
383
384 fn temporary_ids(
387 &self,
388 ops: &[Op],
389 temporary_drops: BTreeSet<(&ConnectionId, String)>,
390 ) -> Result<BTreeSet<CatalogItemId>, Error> {
391 let mut creating = BTreeSet::new();
392 let mut temporary_ids = BTreeSet::new();
393 for op in ops.iter() {
394 if let Op::CreateItem {
395 id,
396 name,
397 item,
398 owner_id: _,
399 } = op
400 {
401 if let Some(conn_id) = item.conn_id() {
402 if self.item_exists_in_temp_schemas(conn_id, &name.item)
403 && !temporary_drops.contains(&(conn_id, name.item.clone()))
404 || creating.contains(&(conn_id, &name.item))
405 {
406 return Err(
407 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
408 );
409 } else {
410 creating.insert((conn_id, &name.item));
411 temporary_ids.insert(id.clone());
412 }
413 }
414 }
415 }
416 Ok(temporary_ids)
417 }
418
419 #[instrument(name = "catalog::transact")]
420 pub async fn transact(
421 &mut self,
422 storage_collections: Option<
425 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
426 >,
427 oracle_write_ts: mz_repr::Timestamp,
428 session: Option<&ConnMeta>,
429 ops: Vec<Op>,
430 ) -> Result<TransactionResult, AdapterError> {
431 trace!("transact: {:?}", ops);
432 fail::fail_point!("catalog_transact", |arg| {
433 Err(AdapterError::Unstructured(anyhow::anyhow!(
434 "failpoint: {arg:?}"
435 )))
436 });
437
438 let drop_ids: BTreeSet<CatalogItemId> = ops
439 .iter()
440 .filter_map(|op| match op {
441 Op::DropObjects(drop_object_infos) => {
442 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
443 let item_ids = ids.filter_map(|id| match id {
444 ObjectId::Item(id) => Some(id),
445 _ => None,
446 });
447 Some(item_ids)
448 }
449 _ => None,
450 })
451 .flatten()
452 .collect();
453 let temporary_drops = drop_ids
454 .iter()
455 .filter_map(|id| {
456 let entry = self.get_entry(id);
457 match entry.item().conn_id() {
458 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
459 None => None,
460 }
461 })
462 .collect();
463 let dropped_global_ids = drop_ids
464 .iter()
465 .flat_map(|item_id| self.get_global_ids(item_id))
466 .collect();
467
468 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
469 let mut builtin_table_updates = vec![];
470 let mut catalog_updates = vec![];
471 let mut audit_events = vec![];
472 let mut storage = self.storage().await;
473 let mut tx = storage
474 .transaction()
475 .await
476 .unwrap_or_terminate("starting catalog transaction");
477
478 let new_state = Self::transact_inner(
479 TransactInnerMode::Commit,
480 storage_collections,
481 oracle_write_ts,
482 session,
483 ops,
484 temporary_ids,
485 &mut builtin_table_updates,
486 &mut catalog_updates,
487 &mut audit_events,
488 &mut tx,
489 &self.state,
490 )
491 .await?;
492
493 tx.commit(oracle_write_ts)
498 .await
499 .unwrap_or_terminate("catalog storage transaction commit must succeed");
500
501 drop(storage);
504 if let Some(new_state) = new_state {
505 self.transient_revision += 1;
506 self.state = new_state;
507 }
508
509 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
511 if self.state.system_config().enable_mz_notices() {
512 self.state().pack_optimizer_notices(
514 &mut builtin_table_updates,
515 dropped_notices.iter(),
516 Diff::MINUS_ONE,
517 );
518 }
519
520 Ok(TransactionResult {
521 builtin_table_updates,
522 catalog_updates,
523 audit_events,
524 })
525 }
526
527 pub async fn transact_incremental_dry_run(
542 &self,
543 base_state: &CatalogState,
544 ops: Vec<Op>,
545 session: Option<&ConnMeta>,
546 prev_snapshot: Option<Snapshot>,
547 oracle_write_ts: mz_repr::Timestamp,
548 ) -> Result<(CatalogState, Snapshot), AdapterError> {
549 let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
552
553 let mut builtin_table_updates = vec![];
554 let mut catalog_updates = vec![];
555 let mut audit_events = vec![];
556 let mut storage = self.storage().await;
557 let mut tx = if let Some(snapshot) = prev_snapshot {
558 storage
561 .transaction_from_snapshot(snapshot)
562 .unwrap_or_terminate("starting catalog transaction from snapshot")
563 } else {
564 storage
567 .transaction()
568 .await
569 .unwrap_or_terminate("starting catalog transaction")
570 };
571
572 let new_state = Self::transact_inner(
574 TransactInnerMode::DryRun,
575 None,
576 oracle_write_ts,
577 session,
578 ops,
579 temporary_ids,
580 &mut builtin_table_updates,
581 &mut catalog_updates,
582 &mut audit_events,
583 &mut tx,
584 base_state,
585 )
586 .await?;
587
588 let new_snapshot = tx.current_snapshot();
591
592 drop(storage);
594
595 let state = new_state.unwrap_or_else(|| base_state.clone());
597 Ok((state, new_snapshot))
598 }
599
600 fn extract_expressions_from_ops(
604 ops: &[Op],
605 optimizer_features: &OptimizerFeatures,
606 ) -> BTreeMap<GlobalId, LocalExpressions> {
607 let mut exprs = BTreeMap::new();
608
609 for op in ops {
610 if let Op::CreateItem { item, .. } = op {
611 match item {
612 CatalogItem::View(view) => {
613 exprs.insert(
614 view.global_id,
615 LocalExpressions {
616 local_mir: (*view.optimized_expr).clone(),
617 optimizer_features: optimizer_features.clone(),
618 },
619 );
620 }
621 CatalogItem::MaterializedView(mv) => {
622 exprs.insert(
623 mv.global_id_writes(),
624 LocalExpressions {
625 local_mir: (*mv.optimized_expr).clone(),
626 optimizer_features: optimizer_features.clone(),
627 },
628 );
629 }
630 CatalogItem::Table(_)
631 | CatalogItem::Source(_)
632 | CatalogItem::Log(_)
633 | CatalogItem::Sink(_)
634 | CatalogItem::Index(_)
635 | CatalogItem::Type(_)
636 | CatalogItem::Func(_)
637 | CatalogItem::Secret(_)
638 | CatalogItem::Connection(_)
639 | CatalogItem::ContinualTask(_) => {}
640 }
641 }
642 }
643
644 exprs
645 }
646
647 #[instrument(name = "catalog::transact_inner")]
655 async fn transact_inner(
656 mode: TransactInnerMode,
657 storage_collections: Option<
658 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
659 >,
660 oracle_write_ts: mz_repr::Timestamp,
661 session: Option<&ConnMeta>,
662 ops: Vec<Op>,
663 temporary_ids: BTreeSet<CatalogItemId>,
664 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
665 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
666 audit_events: &mut Vec<VersionedEvent>,
667 tx: &mut Transaction<'_>,
668 state: &CatalogState,
669 ) -> Result<Option<CatalogState>, AdapterError> {
670 let mut preliminary_state = Cow::Borrowed(state);
697
698 let mut state = Cow::Borrowed(state);
700
701 if ops.is_empty() {
702 return Ok(None);
703 }
704
705 let optimizer_features = OptimizerFeatures::from(state.system_config());
708 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
709
710 let mut storage_collections_to_create = BTreeSet::new();
711 let mut storage_collections_to_drop = BTreeSet::new();
712 let mut storage_collections_to_register = BTreeMap::new();
713
714 let mut updates = Vec::new();
715
716 for op in ops {
717 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
718 oracle_write_ts,
719 session,
720 op,
721 &temporary_ids,
722 audit_events,
723 tx,
724 &*preliminary_state,
725 &mut storage_collections_to_create,
726 &mut storage_collections_to_drop,
727 &mut storage_collections_to_register,
728 )
729 .await?;
730
731 if let Some(builtin_table_update) = weird_builtin_table_update {
740 builtin_table_updates.push(builtin_table_update);
741 }
742
743 let upper = tx.upper();
748 let temporary_item_updates =
749 temporary_item_updates
750 .into_iter()
751 .map(|(item, diff)| StateUpdate {
752 kind: StateUpdateKind::TemporaryItem(item),
753 ts: upper,
754 diff,
755 });
756
757 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
758 op_updates.extend(temporary_item_updates);
759 if !op_updates.is_empty() {
760 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
763 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
764 .to_mut()
765 .apply_updates(op_updates.clone(), &mut local_expr_cache)
766 .await;
767 }
768 updates.append(&mut op_updates);
769 }
770
771 if !updates.is_empty() {
772 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
773 let (op_builtin_table_updates, op_catalog_updates) = state
774 .to_mut()
775 .apply_updates(updates.clone(), &mut local_expr_cache)
776 .await;
777 let op_builtin_table_updates = state
778 .to_mut()
779 .resolve_builtin_table_updates(op_builtin_table_updates);
780 builtin_table_updates.extend(op_builtin_table_updates);
781 parsed_catalog_updates.extend(op_catalog_updates);
782 }
783
784 match mode {
785 TransactInnerMode::Commit => {
786 if let Some(c) = storage_collections {
788 c.prepare_state(
789 tx,
790 storage_collections_to_create,
791 storage_collections_to_drop,
792 storage_collections_to_register,
793 )
794 .await?;
795 }
796 }
797 TransactInnerMode::DryRun => {
798 debug_assert!(
799 storage_collections.is_none(),
800 "dry-run mode must not prepare storage state"
801 );
802 }
803 }
804
805 let updates = tx.get_and_commit_op_updates();
806 if !updates.is_empty() {
807 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
808 let (op_builtin_table_updates, op_catalog_updates) = state
809 .to_mut()
810 .apply_updates(updates.clone(), &mut local_expr_cache)
811 .await;
812 let op_builtin_table_updates = state
813 .to_mut()
814 .resolve_builtin_table_updates(op_builtin_table_updates);
815 builtin_table_updates.extend(op_builtin_table_updates);
816 parsed_catalog_updates.extend(op_catalog_updates);
817 }
818
819 match state {
820 Cow::Owned(state) => Ok(Some(state)),
821 Cow::Borrowed(_) => Ok(None),
822 }
823 }
824
825 #[instrument]
833 async fn transact_op(
834 oracle_write_ts: mz_repr::Timestamp,
835 session: Option<&ConnMeta>,
836 op: Op,
837 temporary_ids: &BTreeSet<CatalogItemId>,
838 audit_events: &mut Vec<VersionedEvent>,
839 tx: &mut Transaction<'_>,
840 state: &CatalogState,
841 storage_collections_to_create: &mut BTreeSet<GlobalId>,
842 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
843 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
844 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
845 let mut weird_builtin_table_update = None;
846 let mut temporary_item_updates = Vec::new();
847
848 match op {
849 Op::AlterRetainHistory { id, value, window } => {
850 let entry = state.get_entry(&id);
851 if id.is_system() {
852 let name = entry.name();
853 let full_name =
854 state.resolve_full_name(name, session.map(|session| session.conn_id()));
855 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
856 full_name.to_string(),
857 ))));
858 }
859
860 let mut new_entry = entry.clone();
861 let previous = new_entry
862 .item
863 .update_retain_history(value.clone(), window)
864 .map_err(|_| {
865 AdapterError::Catalog(Error::new(ErrorKind::Internal(
866 "planner should have rejected invalid alter retain history item type"
867 .to_string(),
868 )))
869 })?;
870
871 if Self::should_audit_log_item(new_entry.item()) {
872 let details =
873 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
874 id: id.to_string(),
875 old_history: previous.map(|previous| previous.to_string()),
876 new_history: value.map(|v| v.to_string()),
877 });
878 CatalogState::add_to_audit_log(
879 &state.system_configuration,
880 oracle_write_ts,
881 session,
882 tx,
883 audit_events,
884 EventType::Alter,
885 catalog_type_to_audit_object_type(new_entry.item().typ()),
886 details,
887 )?;
888 }
889
890 tx.update_item(id, new_entry.into())?;
891
892 Self::log_update(state, &id);
893 }
894 Op::AlterSourceTimestampInterval {
895 id,
896 value,
897 interval,
898 } => {
899 let entry = state.get_entry(&id);
900 if id.is_system() {
901 let name = entry.name();
902 let full_name =
903 state.resolve_full_name(name, session.map(|session| session.conn_id()));
904 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
905 full_name.to_string(),
906 ))));
907 }
908
909 let mut new_entry = entry.clone();
910 new_entry
911 .item
912 .update_timestamp_interval(value, interval)
913 .map_err(|_| {
914 AdapterError::Catalog(Error::new(ErrorKind::Internal(
915 "planner should have rejected invalid alter timestamp interval item type"
916 .to_string(),
917 )))
918 })?;
919
920 tx.update_item(id, new_entry.into())?;
921
922 Self::log_update(state, &id);
923 }
924 Op::AlterRole {
925 id,
926 name,
927 attributes,
928 nopassword,
929 vars,
930 } => {
931 state.ensure_not_reserved_role(&id)?;
932
933 let mut existing_role = state.get_role(&id).clone();
934 let password = attributes.password.clone();
935 let scram_iterations = attributes
936 .scram_iterations
937 .unwrap_or_else(|| state.system_config().scram_iterations());
938 existing_role.attributes = attributes.into();
939 existing_role.vars = vars;
940 let password_action = if nopassword {
941 PasswordAction::Clear
942 } else if let Some(password) = password {
943 PasswordAction::Set(PasswordConfig {
944 password,
945 scram_iterations,
946 })
947 } else {
948 PasswordAction::NoChange
949 };
950 tx.update_role(id, existing_role.into(), password_action)?;
951
952 CatalogState::add_to_audit_log(
953 &state.system_configuration,
954 oracle_write_ts,
955 session,
956 tx,
957 audit_events,
958 EventType::Alter,
959 ObjectType::Role,
960 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
961 id: id.to_string(),
962 name: name.clone(),
963 }),
964 )?;
965
966 info!("update role {name} ({id})");
967 }
968 Op::AlterNetworkPolicy {
969 id,
970 rules,
971 name,
972 owner_id: _owner_id,
973 } => {
974 let existing_policy = state.get_network_policy(&id).clone();
975 let mut policy: NetworkPolicy = existing_policy.into();
976 policy.rules = rules;
977 if is_reserved_name(&name) {
978 return Err(AdapterError::Catalog(Error::new(
979 ErrorKind::ReservedNetworkPolicyName(name),
980 )));
981 }
982 tx.update_network_policy(id, policy.clone())?;
983
984 CatalogState::add_to_audit_log(
985 &state.system_configuration,
986 oracle_write_ts,
987 session,
988 tx,
989 audit_events,
990 EventType::Alter,
991 ObjectType::NetworkPolicy,
992 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
993 id: id.to_string(),
994 name: name.clone(),
995 }),
996 )?;
997
998 info!("update network policy {name} ({id})");
999 }
1000 Op::AlterAddColumn {
1001 id,
1002 new_global_id,
1003 name,
1004 typ,
1005 sql,
1006 } => {
1007 let mut new_entry = state.get_entry(&id).clone();
1008 let version = new_entry.item.add_column(name, typ, sql)?;
1009 let shard_id = state
1012 .storage_metadata()
1013 .get_collection_shard(new_entry.latest_global_id())?;
1014
1015 let CatalogItem::Table(table) = &mut new_entry.item else {
1017 return Err(AdapterError::Unsupported("adding columns to non-Table"));
1018 };
1019 table.collections.insert(version, new_global_id);
1020
1021 tx.update_item(id, new_entry.into())?;
1022 storage_collections_to_register.insert(new_global_id, shard_id);
1023 }
1024 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1025 let mut new_entry = state.get_entry(&id).clone();
1026 let replacement = state.get_entry(&replacement_id);
1027
1028 let new_audit_events =
1029 apply_replacement_audit_events(state, &new_entry, replacement);
1030
1031 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1032 return Err(AdapterError::internal(
1033 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1034 "id must refer to a materialized view",
1035 ));
1036 };
1037 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1038 return Err(AdapterError::internal(
1039 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1040 "replacement_id must refer to a materialized view",
1041 ));
1042 };
1043
1044 mv.apply_replacement(replacement_mv.clone());
1045
1046 tx.remove_item(replacement_id)?;
1047
1048 new_entry.id = replacement_id;
1049 tx_replace_item(tx, state, id, new_entry)?;
1050
1051 let comment_id = CommentObjectId::MaterializedView(replacement_id);
1052 tx.drop_comments(&[comment_id].into())?;
1053
1054 for (event_type, details) in new_audit_events {
1055 CatalogState::add_to_audit_log(
1056 &state.system_configuration,
1057 oracle_write_ts,
1058 session,
1059 tx,
1060 audit_events,
1061 event_type,
1062 ObjectType::MaterializedView,
1063 details,
1064 )?;
1065 }
1066 }
1067 Op::CreateDatabase { name, owner_id } => {
1068 let database_owner_privileges = vec![rbac::owner_privilege(
1069 mz_sql::catalog::ObjectType::Database,
1070 owner_id,
1071 )];
1072 let database_default_privileges = state
1073 .default_privileges
1074 .get_applicable_privileges(
1075 owner_id,
1076 None,
1077 None,
1078 mz_sql::catalog::ObjectType::Database,
1079 )
1080 .map(|item| item.mz_acl_item(owner_id));
1081 let database_privileges: Vec<_> = merge_mz_acl_items(
1082 database_owner_privileges
1083 .into_iter()
1084 .chain(database_default_privileges),
1085 )
1086 .collect();
1087
1088 let schema_owner_privileges = vec![rbac::owner_privilege(
1089 mz_sql::catalog::ObjectType::Schema,
1090 owner_id,
1091 )];
1092 let schema_default_privileges = state
1093 .default_privileges
1094 .get_applicable_privileges(
1095 owner_id,
1096 None,
1097 None,
1098 mz_sql::catalog::ObjectType::Schema,
1099 )
1100 .map(|item| item.mz_acl_item(owner_id))
1101 .chain(std::iter::once(MzAclItem {
1103 grantee: RoleId::Public,
1104 grantor: owner_id,
1105 acl_mode: AclMode::USAGE,
1106 }));
1107 let schema_privileges: Vec<_> = merge_mz_acl_items(
1108 schema_owner_privileges
1109 .into_iter()
1110 .chain(schema_default_privileges),
1111 )
1112 .collect();
1113
1114 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1115 let (database_id, _) = tx.insert_user_database(
1116 &name,
1117 owner_id,
1118 database_privileges.clone(),
1119 &temporary_oids,
1120 )?;
1121 let (schema_id, _) = tx.insert_user_schema(
1122 database_id,
1123 DEFAULT_SCHEMA,
1124 owner_id,
1125 schema_privileges.clone(),
1126 &temporary_oids,
1127 )?;
1128 CatalogState::add_to_audit_log(
1129 &state.system_configuration,
1130 oracle_write_ts,
1131 session,
1132 tx,
1133 audit_events,
1134 EventType::Create,
1135 ObjectType::Database,
1136 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1137 id: database_id.to_string(),
1138 name: name.clone(),
1139 }),
1140 )?;
1141 info!("create database {}", name);
1142
1143 CatalogState::add_to_audit_log(
1144 &state.system_configuration,
1145 oracle_write_ts,
1146 session,
1147 tx,
1148 audit_events,
1149 EventType::Create,
1150 ObjectType::Schema,
1151 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1152 id: schema_id.to_string(),
1153 name: DEFAULT_SCHEMA.to_string(),
1154 database_name: Some(name),
1155 }),
1156 )?;
1157 }
1158 Op::CreateSchema {
1159 database_id,
1160 schema_name,
1161 owner_id,
1162 } => {
1163 if is_reserved_name(&schema_name) {
1164 return Err(AdapterError::Catalog(Error::new(
1165 ErrorKind::ReservedSchemaName(schema_name),
1166 )));
1167 }
1168 let database_id = match database_id {
1169 ResolvedDatabaseSpecifier::Id(id) => id,
1170 ResolvedDatabaseSpecifier::Ambient => {
1171 return Err(AdapterError::Catalog(Error::new(
1172 ErrorKind::ReadOnlySystemSchema(schema_name),
1173 )));
1174 }
1175 };
1176 let owner_privileges = vec![rbac::owner_privilege(
1177 mz_sql::catalog::ObjectType::Schema,
1178 owner_id,
1179 )];
1180 let default_privileges = state
1181 .default_privileges
1182 .get_applicable_privileges(
1183 owner_id,
1184 Some(database_id),
1185 None,
1186 mz_sql::catalog::ObjectType::Schema,
1187 )
1188 .map(|item| item.mz_acl_item(owner_id));
1189 let privileges: Vec<_> =
1190 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1191 .collect();
1192 let (schema_id, _) = tx.insert_user_schema(
1193 database_id,
1194 &schema_name,
1195 owner_id,
1196 privileges.clone(),
1197 &state.get_temporary_oids().collect(),
1198 )?;
1199 CatalogState::add_to_audit_log(
1200 &state.system_configuration,
1201 oracle_write_ts,
1202 session,
1203 tx,
1204 audit_events,
1205 EventType::Create,
1206 ObjectType::Schema,
1207 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1208 id: schema_id.to_string(),
1209 name: schema_name.clone(),
1210 database_name: Some(state.database_by_id[&database_id].name.clone()),
1211 }),
1212 )?;
1213 }
1214 Op::CreateRole { name, attributes } => {
1215 if is_reserved_role_name(&name) {
1216 return Err(AdapterError::Catalog(Error::new(
1217 ErrorKind::ReservedRoleName(name),
1218 )));
1219 }
1220 let membership = RoleMembership::new();
1221 let vars = RoleVars::default();
1222 let (id, _) = tx.insert_user_role(
1223 name.clone(),
1224 attributes.clone(),
1225 membership.clone(),
1226 vars.clone(),
1227 &state.get_temporary_oids().collect(),
1228 )?;
1229 CatalogState::add_to_audit_log(
1230 &state.system_configuration,
1231 oracle_write_ts,
1232 session,
1233 tx,
1234 audit_events,
1235 EventType::Create,
1236 ObjectType::Role,
1237 EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1238 id: id.to_string(),
1239 name: name.clone(),
1240 auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1241 AutoProvisionSource::Oidc => "oidc".to_string(),
1242 AutoProvisionSource::Frontegg => "frontegg".to_string(),
1243 AutoProvisionSource::None => "none".to_string(),
1244 }),
1245 }),
1246 )?;
1247 info!("create role {}", name);
1248 }
1249 Op::CreateCluster {
1250 id,
1251 name,
1252 introspection_sources,
1253 owner_id,
1254 config,
1255 } => {
1256 if is_reserved_name(&name) {
1257 return Err(AdapterError::Catalog(Error::new(
1258 ErrorKind::ReservedClusterName(name),
1259 )));
1260 }
1261 let owner_privileges = vec![rbac::owner_privilege(
1262 mz_sql::catalog::ObjectType::Cluster,
1263 owner_id,
1264 )];
1265 let default_privileges = state
1266 .default_privileges
1267 .get_applicable_privileges(
1268 owner_id,
1269 None,
1270 None,
1271 mz_sql::catalog::ObjectType::Cluster,
1272 )
1273 .map(|item| item.mz_acl_item(owner_id));
1274 let privileges: Vec<_> =
1275 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1276 .collect();
1277 let introspection_source_ids: Vec<_> = introspection_sources
1278 .iter()
1279 .map(|introspection_source| {
1280 Transaction::allocate_introspection_source_index_id(
1281 &id,
1282 introspection_source.variant,
1283 )
1284 })
1285 .collect();
1286
1287 let introspection_sources = introspection_sources
1288 .into_iter()
1289 .zip_eq(introspection_source_ids)
1290 .map(|(log, (item_id, gid))| (log, item_id, gid))
1291 .collect();
1292
1293 tx.insert_user_cluster(
1294 id,
1295 &name,
1296 introspection_sources,
1297 owner_id,
1298 privileges.clone(),
1299 config.clone().into(),
1300 &state.get_temporary_oids().collect(),
1301 )?;
1302 CatalogState::add_to_audit_log(
1303 &state.system_configuration,
1304 oracle_write_ts,
1305 session,
1306 tx,
1307 audit_events,
1308 EventType::Create,
1309 ObjectType::Cluster,
1310 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1311 id: id.to_string(),
1312 name: name.clone(),
1313 }),
1314 )?;
1315 info!("create cluster {}", name);
1316 }
1317 Op::CreateClusterReplica {
1318 cluster_id,
1319 name,
1320 config,
1321 owner_id,
1322 reason,
1323 } => {
1324 if is_reserved_name(&name) {
1325 return Err(AdapterError::Catalog(Error::new(
1326 ErrorKind::ReservedReplicaName(name),
1327 )));
1328 }
1329 let cluster = state.get_cluster(cluster_id);
1330 let id =
1331 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1332 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1333 size,
1334 billed_as,
1335 internal,
1336 ..
1337 }) = &config.location
1338 {
1339 let (reason, scheduling_policies) = reason.into_audit_log();
1340 let details = EventDetails::CreateClusterReplicaV4(
1341 mz_audit_log::CreateClusterReplicaV4 {
1342 cluster_id: cluster_id.to_string(),
1343 cluster_name: cluster.name.clone(),
1344 replica_id: Some(id.to_string()),
1345 replica_name: name.clone(),
1346 logical_size: size.clone(),
1347 billed_as: billed_as.clone(),
1348 internal: *internal,
1349 reason,
1350 scheduling_policies,
1351 },
1352 );
1353 CatalogState::add_to_audit_log(
1354 &state.system_configuration,
1355 oracle_write_ts,
1356 session,
1357 tx,
1358 audit_events,
1359 EventType::Create,
1360 ObjectType::ClusterReplica,
1361 details,
1362 )?;
1363 }
1364 }
1365 Op::CreateItem {
1366 id,
1367 name,
1368 item,
1369 owner_id,
1370 } => {
1371 state.check_unstable_dependencies(&item)?;
1372
1373 match &item {
1374 CatalogItem::Table(table) => {
1375 let gids: Vec<_> = table.global_ids().collect();
1376 assert_eq!(gids.len(), 1);
1377 storage_collections_to_create.extend(gids);
1378 }
1379 CatalogItem::Source(source) => {
1380 storage_collections_to_create.insert(source.global_id());
1381 }
1382 CatalogItem::MaterializedView(mv) => {
1383 let mv_gid = mv.global_id_writes();
1384 if let Some(target_id) = mv.replacement_target {
1385 let target_gid = state.get_entry(&target_id).latest_global_id();
1386 let shard_id =
1387 state.storage_metadata().get_collection_shard(target_gid)?;
1388 storage_collections_to_register.insert(mv_gid, shard_id);
1389 } else {
1390 storage_collections_to_create.insert(mv_gid);
1391 }
1392 }
1393 CatalogItem::ContinualTask(ct) => {
1394 storage_collections_to_create.insert(ct.global_id());
1395 }
1396 CatalogItem::Sink(sink) => {
1397 storage_collections_to_create.insert(sink.global_id());
1398 }
1399 CatalogItem::Log(_)
1400 | CatalogItem::View(_)
1401 | CatalogItem::Index(_)
1402 | CatalogItem::Type(_)
1403 | CatalogItem::Func(_)
1404 | CatalogItem::Secret(_)
1405 | CatalogItem::Connection(_) => (),
1406 }
1407
1408 let system_user = session.map_or(false, |s| s.user().is_system_user());
1409 if !system_user {
1410 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1411 let cluster_name = state.clusters_by_id[&id].name.clone();
1412 return Err(AdapterError::Catalog(Error::new(
1413 ErrorKind::ReadOnlyCluster(cluster_name),
1414 )));
1415 }
1416 }
1417
1418 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1419 let default_privileges = state
1420 .default_privileges
1421 .get_applicable_privileges(
1422 owner_id,
1423 name.qualifiers.database_spec.id(),
1424 Some(name.qualifiers.schema_spec.into()),
1425 item.typ().into(),
1426 )
1427 .map(|item| item.mz_acl_item(owner_id));
1428 let progress_source_privilege = if item.is_progress_source() {
1430 Some(MzAclItem {
1431 grantee: MZ_SUPPORT_ROLE_ID,
1432 grantor: owner_id,
1433 acl_mode: AclMode::SELECT,
1434 })
1435 } else {
1436 None
1437 };
1438 let privileges: Vec<_> = merge_mz_acl_items(
1439 owner_privileges
1440 .into_iter()
1441 .chain(default_privileges)
1442 .chain(progress_source_privilege),
1443 )
1444 .collect();
1445
1446 let temporary_oids = state.get_temporary_oids().collect();
1447
1448 if item.is_temporary() {
1449 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1450 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1451 {
1452 return Err(AdapterError::Catalog(Error::new(
1453 ErrorKind::InvalidTemporarySchema,
1454 )));
1455 }
1456 let oid = tx.allocate_oid(&temporary_oids)?;
1457
1458 let schema_id = name.qualifiers.schema_spec.clone().into();
1459 let item_type = item.typ();
1460 let (create_sql, global_id, versions) = item.to_serialized();
1461
1462 let item = TemporaryItem {
1463 id,
1464 oid,
1465 global_id,
1466 schema_id,
1467 name: name.item.clone(),
1468 create_sql,
1469 conn_id: item.conn_id().cloned(),
1470 owner_id,
1471 privileges: privileges.clone(),
1472 extra_versions: versions,
1473 };
1474 temporary_item_updates.push((item, StateDiff::Addition));
1475
1476 info!(
1477 "create temporary {} {} ({})",
1478 item_type,
1479 state.resolve_full_name(&name, None),
1480 id
1481 );
1482 } else {
1483 if let Some(temp_id) =
1484 item.uses()
1485 .iter()
1486 .find(|id| match state.try_get_entry(*id) {
1487 Some(entry) => entry.item().is_temporary(),
1488 None => temporary_ids.contains(id),
1489 })
1490 {
1491 let temp_item = state.get_entry(temp_id);
1492 return Err(AdapterError::Catalog(Error::new(
1493 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1494 )));
1495 }
1496 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1497 && !system_user
1498 {
1499 let schema_name = state
1500 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1501 .schema;
1502 return Err(AdapterError::Catalog(Error::new(
1503 ErrorKind::ReadOnlySystemSchema(schema_name),
1504 )));
1505 }
1506 let schema_id = name.qualifiers.schema_spec.clone().into();
1507 let item_type = item.typ();
1508 let (create_sql, global_id, versions) = item.to_serialized();
1509 tx.insert_user_item(
1510 id,
1511 global_id,
1512 schema_id,
1513 &name.item,
1514 create_sql,
1515 owner_id,
1516 privileges.clone(),
1517 &temporary_oids,
1518 versions,
1519 )?;
1520 info!(
1521 "create {} {} ({})",
1522 item_type,
1523 state.resolve_full_name(&name, None),
1524 id
1525 );
1526 }
1527
1528 if Self::should_audit_log_item(&item) {
1529 let name = Self::full_name_detail(
1530 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1531 );
1532 let details = match &item {
1533 CatalogItem::Source(s) => {
1534 let cluster_id = match s.data_source {
1535 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1538 match state.get_entry(&ingestion_id).cluster_id() {
1539 Some(cluster_id) => Some(cluster_id.to_string()),
1540 None => None,
1541 }
1542 }
1543 _ => match item.cluster_id() {
1544 Some(cluster_id) => Some(cluster_id.to_string()),
1545 None => None,
1546 },
1547 };
1548
1549 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1550 id: id.to_string(),
1551 cluster_id,
1552 name,
1553 external_type: s.source_type().to_string(),
1554 })
1555 }
1556 CatalogItem::Sink(s) => {
1557 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1558 id: id.to_string(),
1559 cluster_id: Some(s.cluster_id.to_string()),
1560 name,
1561 external_type: s.sink_type().to_string(),
1562 })
1563 }
1564 CatalogItem::Index(i) => {
1565 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1566 id: id.to_string(),
1567 name,
1568 cluster_id: i.cluster_id.to_string(),
1569 })
1570 }
1571 CatalogItem::MaterializedView(mv) => {
1572 EventDetails::CreateMaterializedViewV1(
1573 mz_audit_log::CreateMaterializedViewV1 {
1574 id: id.to_string(),
1575 name,
1576 cluster_id: mv.cluster_id.to_string(),
1577 replacement_target_id: mv
1578 .replacement_target
1579 .map(|id| id.to_string()),
1580 },
1581 )
1582 }
1583 CatalogItem::Table(_)
1584 | CatalogItem::Log(_)
1585 | CatalogItem::View(_)
1586 | CatalogItem::Type(_)
1587 | CatalogItem::Func(_)
1588 | CatalogItem::Secret(_)
1589 | CatalogItem::Connection(_)
1590 | CatalogItem::ContinualTask(_) => {
1591 EventDetails::IdFullNameV1(IdFullNameV1 {
1592 id: id.to_string(),
1593 name,
1594 })
1595 }
1596 };
1597 CatalogState::add_to_audit_log(
1598 &state.system_configuration,
1599 oracle_write_ts,
1600 session,
1601 tx,
1602 audit_events,
1603 EventType::Create,
1604 catalog_type_to_audit_object_type(item.typ()),
1605 details,
1606 )?;
1607 }
1608 }
1609 Op::CreateNetworkPolicy {
1610 rules,
1611 name,
1612 owner_id,
1613 } => {
1614 if state.network_policies_by_name.contains_key(&name) {
1615 return Err(AdapterError::PlanError(PlanError::Catalog(
1616 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1617 )));
1618 }
1619 if is_reserved_name(&name) {
1620 return Err(AdapterError::Catalog(Error::new(
1621 ErrorKind::ReservedNetworkPolicyName(name),
1622 )));
1623 }
1624
1625 let owner_privileges = vec![rbac::owner_privilege(
1626 mz_sql::catalog::ObjectType::NetworkPolicy,
1627 owner_id,
1628 )];
1629 let default_privileges = state
1630 .default_privileges
1631 .get_applicable_privileges(
1632 owner_id,
1633 None,
1634 None,
1635 mz_sql::catalog::ObjectType::NetworkPolicy,
1636 )
1637 .map(|item| item.mz_acl_item(owner_id));
1638 let privileges: Vec<_> =
1639 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1640 .collect();
1641
1642 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1643 let id = tx.insert_user_network_policy(
1644 name.clone(),
1645 rules,
1646 privileges,
1647 owner_id,
1648 &temporary_oids,
1649 )?;
1650
1651 CatalogState::add_to_audit_log(
1652 &state.system_configuration,
1653 oracle_write_ts,
1654 session,
1655 tx,
1656 audit_events,
1657 EventType::Create,
1658 ObjectType::NetworkPolicy,
1659 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1660 id: id.to_string(),
1661 name: name.clone(),
1662 }),
1663 )?;
1664
1665 info!("created network policy {name} ({id})");
1666 }
1667 Op::Comment {
1668 object_id,
1669 sub_component,
1670 comment,
1671 } => {
1672 tx.update_comment(object_id, sub_component, comment)?;
1673 let entry = state.get_comment_id_entry(&object_id);
1674 let should_log = entry
1675 .map(|entry| Self::should_audit_log_item(entry.item()))
1676 .unwrap_or(true);
1678 if let (Some(conn_id), true) =
1681 (session.map(|session| session.conn_id()), should_log)
1682 {
1683 CatalogState::add_to_audit_log(
1684 &state.system_configuration,
1685 oracle_write_ts,
1686 session,
1687 tx,
1688 audit_events,
1689 EventType::Comment,
1690 comment_id_to_audit_object_type(object_id),
1691 EventDetails::IdNameV1(IdNameV1 {
1692 id: format!("{object_id:?}"),
1694 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1695 }),
1696 )?;
1697 }
1698 }
1699 Op::UpdateSourceReferences {
1700 source_id,
1701 references,
1702 } => {
1703 tx.update_source_references(
1704 source_id,
1705 references
1706 .references
1707 .into_iter()
1708 .map(|reference| reference.into())
1709 .collect(),
1710 references.updated_at,
1711 )?;
1712 }
1713 Op::DropObjects(drop_object_infos) => {
1714 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1716
1717 tx.drop_comments(&delta.comments)?;
1719
1720 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1722 delta
1723 .items
1724 .iter()
1725 .map(|id| id)
1726 .partition(|id| !state.get_entry(*id).item().is_temporary());
1727 tx.remove_items(&durable_items_to_drop)?;
1728 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1729 let entry = state.get_entry(&id);
1730 (entry.clone().into(), StateDiff::Retraction)
1731 }));
1732
1733 for item_id in delta.items {
1734 let entry = state.get_entry(&item_id);
1735
1736 if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1740 {
1741 storage_collections_to_drop.extend(entry.global_ids());
1742 }
1743
1744 if state.source_references.contains_key(&item_id) {
1745 tx.remove_source_references(item_id)?;
1746 }
1747
1748 if Self::should_audit_log_item(entry.item()) {
1749 CatalogState::add_to_audit_log(
1750 &state.system_configuration,
1751 oracle_write_ts,
1752 session,
1753 tx,
1754 audit_events,
1755 EventType::Drop,
1756 catalog_type_to_audit_object_type(entry.item().typ()),
1757 EventDetails::IdFullNameV1(IdFullNameV1 {
1758 id: item_id.to_string(),
1759 name: Self::full_name_detail(&state.resolve_full_name(
1760 entry.name(),
1761 session.map(|session| session.conn_id()),
1762 )),
1763 }),
1764 )?;
1765 }
1766 info!(
1767 "drop {} {} ({})",
1768 entry.item_type(),
1769 state.resolve_full_name(entry.name(), entry.conn_id()),
1770 item_id
1771 );
1772 }
1773
1774 let schemas = delta
1776 .schemas
1777 .iter()
1778 .map(|(schema_spec, database_spec)| {
1779 (SchemaId::from(schema_spec), *database_spec)
1780 })
1781 .collect();
1782 tx.remove_schemas(&schemas)?;
1783
1784 for (schema_spec, database_spec) in delta.schemas {
1785 let schema = state.get_schema(
1786 &database_spec,
1787 &schema_spec,
1788 session
1789 .map(|session| session.conn_id())
1790 .unwrap_or(&SYSTEM_CONN_ID),
1791 );
1792
1793 let schema_id = SchemaId::from(schema_spec);
1794 let database_id = match database_spec {
1795 ResolvedDatabaseSpecifier::Ambient => None,
1796 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1797 };
1798
1799 CatalogState::add_to_audit_log(
1800 &state.system_configuration,
1801 oracle_write_ts,
1802 session,
1803 tx,
1804 audit_events,
1805 EventType::Drop,
1806 ObjectType::Schema,
1807 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1808 id: schema_id.to_string(),
1809 name: schema.name.schema.to_string(),
1810 database_name: database_id
1811 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1812 }),
1813 )?;
1814 }
1815
1816 tx.remove_databases(&delta.databases)?;
1818
1819 for database_id in delta.databases {
1820 let database = state.get_database(&database_id).clone();
1821
1822 CatalogState::add_to_audit_log(
1823 &state.system_configuration,
1824 oracle_write_ts,
1825 session,
1826 tx,
1827 audit_events,
1828 EventType::Drop,
1829 ObjectType::Database,
1830 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1831 id: database_id.to_string(),
1832 name: database.name.clone(),
1833 }),
1834 )?;
1835 }
1836
1837 tx.remove_user_roles(&delta.roles)?;
1839
1840 for role_id in delta.roles {
1841 let role = state
1842 .roles_by_id
1843 .get(&role_id)
1844 .expect("catalog out of sync");
1845
1846 CatalogState::add_to_audit_log(
1847 &state.system_configuration,
1848 oracle_write_ts,
1849 session,
1850 tx,
1851 audit_events,
1852 EventType::Drop,
1853 ObjectType::Role,
1854 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1855 id: role.id.to_string(),
1856 name: role.name.clone(),
1857 }),
1858 )?;
1859 info!("drop role {}", role.name());
1860 }
1861
1862 tx.remove_network_policies(&delta.network_policies)?;
1864
1865 for network_policy_id in delta.network_policies {
1866 let policy = state
1867 .network_policies_by_id
1868 .get(&network_policy_id)
1869 .expect("catalog out of sync");
1870
1871 CatalogState::add_to_audit_log(
1872 &state.system_configuration,
1873 oracle_write_ts,
1874 session,
1875 tx,
1876 audit_events,
1877 EventType::Drop,
1878 ObjectType::NetworkPolicy,
1879 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1880 id: policy.id.to_string(),
1881 name: policy.name.clone(),
1882 }),
1883 )?;
1884 info!("drop network policy {}", policy.name.clone());
1885 }
1886
1887 let replicas = delta.replicas.keys().copied().collect();
1889 tx.remove_cluster_replicas(&replicas)?;
1890
1891 for (replica_id, (cluster_id, reason)) in delta.replicas {
1892 let cluster = state.get_cluster(cluster_id);
1893 let replica = cluster.replica(replica_id).expect("Must exist");
1894
1895 let (reason, scheduling_policies) = reason.into_audit_log();
1896 let details =
1897 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1898 cluster_id: cluster_id.to_string(),
1899 cluster_name: cluster.name.clone(),
1900 replica_id: Some(replica_id.to_string()),
1901 replica_name: replica.name.clone(),
1902 reason,
1903 scheduling_policies,
1904 });
1905 CatalogState::add_to_audit_log(
1906 &state.system_configuration,
1907 oracle_write_ts,
1908 session,
1909 tx,
1910 audit_events,
1911 EventType::Drop,
1912 ObjectType::ClusterReplica,
1913 details,
1914 )?;
1915 }
1916
1917 tx.remove_clusters(&delta.clusters)?;
1919
1920 for cluster_id in delta.clusters {
1921 let cluster = state.get_cluster(cluster_id);
1922
1923 CatalogState::add_to_audit_log(
1924 &state.system_configuration,
1925 oracle_write_ts,
1926 session,
1927 tx,
1928 audit_events,
1929 EventType::Drop,
1930 ObjectType::Cluster,
1931 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1932 id: cluster.id.to_string(),
1933 name: cluster.name.clone(),
1934 }),
1935 )?;
1936 }
1937 }
1938 Op::GrantRole {
1939 role_id,
1940 member_id,
1941 grantor_id,
1942 } => {
1943 state.ensure_not_reserved_role(&member_id)?;
1944 state.ensure_grantable_role(&role_id)?;
1945 if state.collect_role_membership(&role_id).contains(&member_id) {
1946 let group_role = state.get_role(&role_id);
1947 let member_role = state.get_role(&member_id);
1948 return Err(AdapterError::Catalog(Error::new(
1949 ErrorKind::CircularRoleMembership {
1950 role_name: group_role.name().to_string(),
1951 member_name: member_role.name().to_string(),
1952 },
1953 )));
1954 }
1955 let mut member_role = state.get_role(&member_id).clone();
1956 member_role.membership.map.insert(role_id, grantor_id);
1957 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1958
1959 CatalogState::add_to_audit_log(
1960 &state.system_configuration,
1961 oracle_write_ts,
1962 session,
1963 tx,
1964 audit_events,
1965 EventType::Grant,
1966 ObjectType::Role,
1967 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1968 role_id: role_id.to_string(),
1969 member_id: member_id.to_string(),
1970 grantor_id: grantor_id.to_string(),
1971 executed_by: session
1972 .map(|session| session.authenticated_role_id())
1973 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1974 .to_string(),
1975 }),
1976 )?;
1977 }
1978 Op::RevokeRole {
1979 role_id,
1980 member_id,
1981 grantor_id,
1982 } => {
1983 state.ensure_not_reserved_role(&member_id)?;
1984 state.ensure_grantable_role(&role_id)?;
1985 let mut member_role = state.get_role(&member_id).clone();
1986 member_role.membership.map.remove(&role_id);
1987 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1988
1989 CatalogState::add_to_audit_log(
1990 &state.system_configuration,
1991 oracle_write_ts,
1992 session,
1993 tx,
1994 audit_events,
1995 EventType::Revoke,
1996 ObjectType::Role,
1997 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1998 role_id: role_id.to_string(),
1999 member_id: member_id.to_string(),
2000 grantor_id: grantor_id.to_string(),
2001 executed_by: session
2002 .map(|session| session.authenticated_role_id())
2003 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
2004 .to_string(),
2005 }),
2006 )?;
2007 }
2008 Op::UpdatePrivilege {
2009 target_id,
2010 privilege,
2011 variant,
2012 } => {
2013 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2014 UpdatePrivilegeVariant::Grant => {
2015 privileges.grant(privilege);
2016 }
2017 UpdatePrivilegeVariant::Revoke => {
2018 privileges.revoke(&privilege);
2019 }
2020 };
2021 match &target_id {
2022 SystemObjectId::Object(object_id) => match object_id {
2023 ObjectId::Cluster(id) => {
2024 let mut cluster = state.get_cluster(*id).clone();
2025 update_privilege_fn(&mut cluster.privileges);
2026 tx.update_cluster(*id, cluster.into())?;
2027 }
2028 ObjectId::Database(id) => {
2029 let mut database = state.get_database(id).clone();
2030 update_privilege_fn(&mut database.privileges);
2031 tx.update_database(*id, database.into())?;
2032 }
2033 ObjectId::NetworkPolicy(id) => {
2034 let mut policy = state.get_network_policy(id).clone();
2035 update_privilege_fn(&mut policy.privileges);
2036 tx.update_network_policy(*id, policy.into())?;
2037 }
2038 ObjectId::Schema((database_spec, schema_spec)) => {
2039 let schema_id = schema_spec.clone().into();
2040 let mut schema = state
2041 .get_schema(
2042 database_spec,
2043 schema_spec,
2044 session
2045 .map(|session| session.conn_id())
2046 .unwrap_or(&SYSTEM_CONN_ID),
2047 )
2048 .clone();
2049 update_privilege_fn(&mut schema.privileges);
2050 tx.update_schema(schema_id, schema.into())?;
2051 }
2052 ObjectId::Item(id) => {
2053 let entry = state.get_entry(id);
2054 let mut new_entry = entry.clone();
2055 update_privilege_fn(&mut new_entry.privileges);
2056 if !new_entry.item().is_temporary() {
2057 tx.update_item(*id, new_entry.into())?;
2058 } else {
2059 temporary_item_updates
2060 .push((entry.clone().into(), StateDiff::Retraction));
2061 temporary_item_updates
2062 .push((new_entry.into(), StateDiff::Addition));
2063 }
2064 }
2065 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2066 },
2067 SystemObjectId::System => {
2068 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2069 update_privilege_fn(&mut system_privileges);
2070 let new_privilege =
2071 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2072 tx.set_system_privilege(
2073 privilege.grantee,
2074 privilege.grantor,
2075 new_privilege.map(|new_privilege| new_privilege.acl_mode),
2076 )?;
2077 }
2078 }
2079 let object_type = state.get_system_object_type(&target_id);
2080 let object_id_str = match &target_id {
2081 SystemObjectId::System => "SYSTEM".to_string(),
2082 SystemObjectId::Object(id) => id.to_string(),
2083 };
2084 CatalogState::add_to_audit_log(
2085 &state.system_configuration,
2086 oracle_write_ts,
2087 session,
2088 tx,
2089 audit_events,
2090 variant.into(),
2091 system_object_type_to_audit_object_type(&object_type),
2092 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2093 object_id: object_id_str,
2094 grantee_id: privilege.grantee.to_string(),
2095 grantor_id: privilege.grantor.to_string(),
2096 privileges: privilege.acl_mode.to_string(),
2097 }),
2098 )?;
2099 }
2100 Op::UpdateDefaultPrivilege {
2101 privilege_object,
2102 privilege_acl_item,
2103 variant,
2104 } => {
2105 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2106 match variant {
2107 UpdatePrivilegeVariant::Grant => default_privileges
2108 .grant(privilege_object.clone(), privilege_acl_item.clone()),
2109 UpdatePrivilegeVariant::Revoke => {
2110 default_privileges.revoke(&privilege_object, &privilege_acl_item)
2111 }
2112 }
2113 let new_acl_mode = default_privileges
2114 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2115 tx.set_default_privilege(
2116 privilege_object.role_id,
2117 privilege_object.database_id,
2118 privilege_object.schema_id,
2119 privilege_object.object_type,
2120 privilege_acl_item.grantee,
2121 new_acl_mode.cloned(),
2122 )?;
2123 CatalogState::add_to_audit_log(
2124 &state.system_configuration,
2125 oracle_write_ts,
2126 session,
2127 tx,
2128 audit_events,
2129 variant.into(),
2130 object_type_to_audit_object_type(privilege_object.object_type),
2131 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2132 role_id: privilege_object.role_id.to_string(),
2133 database_id: privilege_object.database_id.map(|id| id.to_string()),
2134 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2135 grantee_id: privilege_acl_item.grantee.to_string(),
2136 privileges: privilege_acl_item.acl_mode.to_string(),
2137 }),
2138 )?;
2139 }
2140 Op::RenameCluster {
2141 id,
2142 name,
2143 to_name,
2144 check_reserved_names,
2145 } => {
2146 if id.is_system() {
2147 return Err(AdapterError::Catalog(Error::new(
2148 ErrorKind::ReadOnlyCluster(name.clone()),
2149 )));
2150 }
2151 if check_reserved_names && is_reserved_name(&to_name) {
2152 return Err(AdapterError::Catalog(Error::new(
2153 ErrorKind::ReservedClusterName(to_name),
2154 )));
2155 }
2156 tx.rename_cluster(id, &name, &to_name)?;
2157 CatalogState::add_to_audit_log(
2158 &state.system_configuration,
2159 oracle_write_ts,
2160 session,
2161 tx,
2162 audit_events,
2163 EventType::Alter,
2164 ObjectType::Cluster,
2165 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2166 id: id.to_string(),
2167 old_name: name.clone(),
2168 new_name: to_name.clone(),
2169 }),
2170 )?;
2171 info!("rename cluster {name} to {to_name}");
2172 }
2173 Op::RenameClusterReplica {
2174 cluster_id,
2175 replica_id,
2176 name,
2177 to_name,
2178 } => {
2179 if is_reserved_name(&to_name) {
2180 return Err(AdapterError::Catalog(Error::new(
2181 ErrorKind::ReservedReplicaName(to_name),
2182 )));
2183 }
2184 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2185 CatalogState::add_to_audit_log(
2186 &state.system_configuration,
2187 oracle_write_ts,
2188 session,
2189 tx,
2190 audit_events,
2191 EventType::Alter,
2192 ObjectType::ClusterReplica,
2193 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2194 cluster_id: cluster_id.to_string(),
2195 replica_id: replica_id.to_string(),
2196 old_name: name.replica.as_str().to_string(),
2197 new_name: to_name.clone(),
2198 }),
2199 )?;
2200 info!("rename cluster replica {name} to {to_name}");
2201 }
2202 Op::RenameItem {
2203 id,
2204 to_name,
2205 current_full_name,
2206 } => {
2207 let mut updates = Vec::new();
2208
2209 let entry = state.get_entry(&id);
2210 if let CatalogItem::Type(_) = entry.item() {
2211 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2212 current_full_name.to_string(),
2213 ))));
2214 }
2215
2216 if entry.id().is_system() {
2217 let name = state
2218 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2219 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2220 name.to_string(),
2221 ))));
2222 }
2223
2224 let mut to_full_name = current_full_name.clone();
2225 to_full_name.item.clone_from(&to_name);
2226
2227 let mut to_qualified_name = entry.name().clone();
2228 to_qualified_name.item.clone_from(&to_name);
2229
2230 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2231 id: id.to_string(),
2232 old_name: Self::full_name_detail(¤t_full_name),
2233 new_name: Self::full_name_detail(&to_full_name),
2234 });
2235 if Self::should_audit_log_item(entry.item()) {
2236 CatalogState::add_to_audit_log(
2237 &state.system_configuration,
2238 oracle_write_ts,
2239 session,
2240 tx,
2241 audit_events,
2242 EventType::Alter,
2243 catalog_type_to_audit_object_type(entry.item().typ()),
2244 details,
2245 )?;
2246 }
2247
2248 let mut new_entry = entry.clone();
2250 new_entry.name.item.clone_from(&to_name);
2251 new_entry.item = entry
2252 .item()
2253 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2254 .map_err(|e| {
2255 Error::new(ErrorKind::from(AmbiguousRename {
2256 depender: state
2257 .resolve_full_name(entry.name(), entry.conn_id())
2258 .to_string(),
2259 dependee: state
2260 .resolve_full_name(entry.name(), entry.conn_id())
2261 .to_string(),
2262 message: e,
2263 }))
2264 })?;
2265
2266 for id in entry.referenced_by() {
2267 let dependent_item = state.get_entry(id);
2268 let mut to_entry = dependent_item.clone();
2269 to_entry.item = dependent_item
2270 .item()
2271 .rename_item_refs(
2272 current_full_name.clone(),
2273 to_full_name.item.clone(),
2274 false,
2275 )
2276 .map_err(|e| {
2277 Error::new(ErrorKind::from(AmbiguousRename {
2278 depender: state
2279 .resolve_full_name(
2280 dependent_item.name(),
2281 dependent_item.conn_id(),
2282 )
2283 .to_string(),
2284 dependee: state
2285 .resolve_full_name(entry.name(), entry.conn_id())
2286 .to_string(),
2287 message: e,
2288 }))
2289 })?;
2290
2291 if !to_entry.item().is_temporary() {
2292 tx.update_item(*id, to_entry.into())?;
2293 } else {
2294 temporary_item_updates
2295 .push((dependent_item.clone().into(), StateDiff::Retraction));
2296 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2297 }
2298 updates.push(*id);
2299 }
2300 if !new_entry.item().is_temporary() {
2301 tx.update_item(id, new_entry.into())?;
2302 } else {
2303 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2304 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2305 }
2306
2307 updates.push(id);
2308 for id in updates {
2309 Self::log_update(state, &id);
2310 }
2311 }
2312 Op::RenameSchema {
2313 database_spec,
2314 schema_spec,
2315 new_name,
2316 check_reserved_names,
2317 } => {
2318 if check_reserved_names && is_reserved_name(&new_name) {
2319 return Err(AdapterError::Catalog(Error::new(
2320 ErrorKind::ReservedSchemaName(new_name),
2321 )));
2322 }
2323
2324 let conn_id = session
2325 .map(|session| session.conn_id())
2326 .unwrap_or(&SYSTEM_CONN_ID);
2327
2328 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2329 let cur_name = schema.name().schema.clone();
2330
2331 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2332 return Err(AdapterError::Catalog(Error::new(
2333 ErrorKind::AmbientSchemaRename(cur_name),
2334 )));
2335 };
2336 let database = state.get_database(&database_id);
2337 let database_name = &database.name;
2338
2339 let mut updates = Vec::new();
2340 let mut items_to_update = BTreeMap::new();
2341
2342 let mut update_item = |id| {
2343 if items_to_update.contains_key(id) {
2344 return Ok(());
2345 }
2346
2347 let entry = state.get_entry(id);
2348
2349 let mut new_entry = entry.clone();
2351 new_entry.item = entry
2352 .item
2353 .rename_schema_refs(database_name, &cur_name, &new_name)
2354 .map_err(|(s, _i)| {
2355 Error::new(ErrorKind::from(AmbiguousRename {
2356 depender: state
2357 .resolve_full_name(entry.name(), entry.conn_id())
2358 .to_string(),
2359 dependee: format!("{database_name}.{cur_name}"),
2360 message: format!("ambiguous reference to schema named {s}"),
2361 }))
2362 })?;
2363
2364 if !new_entry.item().is_temporary() {
2366 items_to_update.insert(*id, new_entry.into());
2367 } else {
2368 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2369 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2370 }
2371 updates.push(id);
2372
2373 Ok::<_, AdapterError>(())
2374 };
2375
2376 for (_name, item_id) in &schema.items {
2378 update_item(item_id)?;
2380
2381 for id in state.get_entry(item_id).referenced_by() {
2383 update_item(id)?;
2384 }
2385 }
2386 tx.update_items(items_to_update)?;
2389
2390 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2392 let schema_name = schema.name().schema.clone();
2393 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2394 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2395 )));
2396 };
2397
2398 let database_name = database_spec
2400 .id()
2401 .map(|id| state.get_database(&id).name.clone());
2402 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2403 id: schema_id.to_string(),
2404 old_name: schema.name().schema.clone(),
2405 new_name: new_name.clone(),
2406 database_name,
2407 });
2408 CatalogState::add_to_audit_log(
2409 &state.system_configuration,
2410 oracle_write_ts,
2411 session,
2412 tx,
2413 audit_events,
2414 EventType::Alter,
2415 mz_audit_log::ObjectType::Schema,
2416 details,
2417 )?;
2418
2419 let mut new_schema = schema.clone();
2421 new_schema.name.schema.clone_from(&new_name);
2422 tx.update_schema(schema_id, new_schema.into())?;
2423
2424 for id in updates {
2425 Self::log_update(state, id);
2426 }
2427 }
2428 Op::UpdateOwner { id, new_owner } => {
2429 let conn_id = session
2430 .map(|session| session.conn_id())
2431 .unwrap_or(&SYSTEM_CONN_ID);
2432 let old_owner = state
2433 .get_owner_id(&id, conn_id)
2434 .expect("cannot update the owner of an object without an owner");
2435 match &id {
2436 ObjectId::Cluster(id) => {
2437 let mut cluster = state.get_cluster(*id).clone();
2438 if id.is_system() {
2439 return Err(AdapterError::Catalog(Error::new(
2440 ErrorKind::ReadOnlyCluster(cluster.name),
2441 )));
2442 }
2443 Self::update_privilege_owners(
2444 &mut cluster.privileges,
2445 cluster.owner_id,
2446 new_owner,
2447 );
2448 cluster.owner_id = new_owner;
2449 tx.update_cluster(*id, cluster.into())?;
2450 }
2451 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2452 let cluster = state.get_cluster(*cluster_id);
2453 let mut replica = cluster
2454 .replica(*replica_id)
2455 .expect("catalog out of sync")
2456 .clone();
2457 if replica_id.is_system() {
2458 return Err(AdapterError::Catalog(Error::new(
2459 ErrorKind::ReadOnlyClusterReplica(replica.name),
2460 )));
2461 }
2462 replica.owner_id = new_owner;
2463 tx.update_cluster_replica(*replica_id, replica.into())?;
2464 }
2465 ObjectId::Database(id) => {
2466 let mut database = state.get_database(id).clone();
2467 if id.is_system() {
2468 return Err(AdapterError::Catalog(Error::new(
2469 ErrorKind::ReadOnlyDatabase(database.name),
2470 )));
2471 }
2472 Self::update_privilege_owners(
2473 &mut database.privileges,
2474 database.owner_id,
2475 new_owner,
2476 );
2477 database.owner_id = new_owner;
2478 tx.update_database(*id, database.clone().into())?;
2479 }
2480 ObjectId::Schema((database_spec, schema_spec)) => {
2481 let schema_id: SchemaId = schema_spec.clone().into();
2482 let mut schema = state
2483 .get_schema(database_spec, schema_spec, conn_id)
2484 .clone();
2485 if schema_id.is_system() {
2486 let name = schema.name();
2487 let full_name = state.resolve_full_schema_name(name);
2488 return Err(AdapterError::Catalog(Error::new(
2489 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2490 )));
2491 }
2492 Self::update_privilege_owners(
2493 &mut schema.privileges,
2494 schema.owner_id,
2495 new_owner,
2496 );
2497 schema.owner_id = new_owner;
2498 tx.update_schema(schema_id, schema.into())?;
2499 }
2500 ObjectId::Item(id) => {
2501 let entry = state.get_entry(id);
2502 let mut new_entry = entry.clone();
2503 if id.is_system() {
2504 let full_name = state.resolve_full_name(
2505 new_entry.name(),
2506 session.map(|session| session.conn_id()),
2507 );
2508 return Err(AdapterError::Catalog(Error::new(
2509 ErrorKind::ReadOnlyItem(full_name.to_string()),
2510 )));
2511 }
2512 Self::update_privilege_owners(
2513 &mut new_entry.privileges,
2514 new_entry.owner_id,
2515 new_owner,
2516 );
2517 new_entry.owner_id = new_owner;
2518 if !new_entry.item().is_temporary() {
2519 tx.update_item(*id, new_entry.into())?;
2520 } else {
2521 temporary_item_updates
2522 .push((entry.clone().into(), StateDiff::Retraction));
2523 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2524 }
2525 }
2526 ObjectId::NetworkPolicy(id) => {
2527 let mut policy = state.get_network_policy(id).clone();
2528 if id.is_system() {
2529 return Err(AdapterError::Catalog(Error::new(
2530 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2531 )));
2532 }
2533 Self::update_privilege_owners(
2534 &mut policy.privileges,
2535 policy.owner_id,
2536 new_owner,
2537 );
2538 policy.owner_id = new_owner;
2539 tx.update_network_policy(*id, policy.into())?;
2540 }
2541 ObjectId::Role(_) => unreachable!("roles have no owner"),
2542 }
2543 let object_type = state.get_object_type(&id);
2544 CatalogState::add_to_audit_log(
2545 &state.system_configuration,
2546 oracle_write_ts,
2547 session,
2548 tx,
2549 audit_events,
2550 EventType::Alter,
2551 object_type_to_audit_object_type(object_type),
2552 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2553 object_id: id.to_string(),
2554 old_owner_id: old_owner.to_string(),
2555 new_owner_id: new_owner.to_string(),
2556 }),
2557 )?;
2558 }
2559 Op::UpdateClusterConfig { id, name, config } => {
2560 let mut cluster = state.get_cluster(id).clone();
2561 cluster.config = config;
2562 tx.update_cluster(id, cluster.into())?;
2563 info!("update cluster {}", name);
2564
2565 CatalogState::add_to_audit_log(
2566 &state.system_configuration,
2567 oracle_write_ts,
2568 session,
2569 tx,
2570 audit_events,
2571 EventType::Alter,
2572 ObjectType::Cluster,
2573 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2574 id: id.to_string(),
2575 name,
2576 }),
2577 )?;
2578 }
2579 Op::UpdateClusterReplicaConfig {
2580 replica_id,
2581 cluster_id,
2582 config,
2583 } => {
2584 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2585 info!("update replica {}", replica.name);
2586 tx.update_cluster_replica(
2587 replica_id,
2588 mz_catalog::durable::ClusterReplica {
2589 cluster_id,
2590 replica_id,
2591 name: replica.name.clone(),
2592 config: config.clone().into(),
2593 owner_id: replica.owner_id,
2594 },
2595 )?;
2596 }
2597 Op::UpdateItem { id, name, to_item } => {
2598 let mut entry = state.get_entry(&id).clone();
2599 entry.name = name.clone();
2600 entry.item = to_item.clone();
2601 tx.update_item(id, entry.into())?;
2602
2603 if Self::should_audit_log_item(&to_item) {
2604 let mut full_name = Self::full_name_detail(
2605 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2606 );
2607 full_name.item = name.item;
2608
2609 CatalogState::add_to_audit_log(
2610 &state.system_configuration,
2611 oracle_write_ts,
2612 session,
2613 tx,
2614 audit_events,
2615 EventType::Alter,
2616 catalog_type_to_audit_object_type(to_item.typ()),
2617 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2618 id: id.to_string(),
2619 name: full_name,
2620 }),
2621 )?;
2622 }
2623
2624 Self::log_update(state, &id);
2625 }
2626 Op::UpdateSystemConfiguration { name, value } => {
2627 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2628 tx.upsert_system_config(&name, parsed_value.clone())?;
2629 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2634 let with_0dt_deployment_max_wait =
2635 Duration::parse(VarInput::Flat(&parsed_value))
2636 .expect("parsing succeeded above");
2637 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2638 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2639 let with_0dt_deployment_ddl_check_interval =
2640 Duration::parse(VarInput::Flat(&parsed_value))
2641 .expect("parsing succeeded above");
2642 tx.set_0dt_deployment_ddl_check_interval(
2643 with_0dt_deployment_ddl_check_interval,
2644 )?;
2645 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2646 let panic_after_timeout =
2647 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2648 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2649 }
2650
2651 CatalogState::add_to_audit_log(
2652 &state.system_configuration,
2653 oracle_write_ts,
2654 session,
2655 tx,
2656 audit_events,
2657 EventType::Alter,
2658 ObjectType::System,
2659 EventDetails::SetV1(mz_audit_log::SetV1 {
2660 name,
2661 value: Some(value.borrow().to_vec().join(", ")),
2662 }),
2663 )?;
2664 }
2665 Op::ResetSystemConfiguration { name } => {
2666 tx.remove_system_config(&name);
2667 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2672 tx.reset_0dt_deployment_max_wait()?;
2673 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2674 tx.reset_0dt_deployment_ddl_check_interval()?;
2675 }
2676
2677 CatalogState::add_to_audit_log(
2678 &state.system_configuration,
2679 oracle_write_ts,
2680 session,
2681 tx,
2682 audit_events,
2683 EventType::Alter,
2684 ObjectType::System,
2685 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2686 )?;
2687 }
2688 Op::ResetAllSystemConfiguration => {
2689 tx.clear_system_configs();
2690 tx.reset_0dt_deployment_max_wait()?;
2691 tx.reset_0dt_deployment_ddl_check_interval()?;
2692
2693 CatalogState::add_to_audit_log(
2694 &state.system_configuration,
2695 oracle_write_ts,
2696 session,
2697 tx,
2698 audit_events,
2699 EventType::Alter,
2700 ObjectType::System,
2701 EventDetails::ResetAllV1,
2702 )?;
2703 }
2704 Op::WeirdStorageUsageUpdates {
2705 object_id,
2706 size_bytes,
2707 collection_timestamp,
2708 } => {
2709 let id = tx.allocate_storage_usage_ids()?;
2710 let metric =
2711 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2712 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2713 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2714 weird_builtin_table_update = Some(builtin_table_update);
2715 }
2716 Op::InjectAuditEvents { events } => {
2717 for event in events {
2718 let id = tx.allocate_audit_log_id()?;
2719 let ev = VersionedEvent::new(
2720 id,
2721 event.event_type,
2722 event.object_type,
2723 event.details,
2724 event.user,
2725 oracle_write_ts.into(),
2726 );
2727 audit_events.push(ev.clone());
2728 tx.insert_audit_log_event(ev);
2729 }
2730 }
2731 };
2732 Ok((weird_builtin_table_update, temporary_item_updates))
2733 }
2734
2735 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2736 let entry = state.get_entry(id);
2737 info!(
2738 "update {} {} ({})",
2739 entry.item_type(),
2740 state.resolve_full_name(entry.name(), entry.conn_id()),
2741 id
2742 );
2743 }
2744
2745 fn update_privilege_owners(
2749 privileges: &mut PrivilegeMap,
2750 old_owner: RoleId,
2751 new_owner: RoleId,
2752 ) {
2753 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2755
2756 let mut new_present = false;
2757 for privilege in flat_privileges.iter_mut() {
2758 if privilege.grantor == old_owner {
2761 privilege.grantor = new_owner;
2762 } else if privilege.grantor == new_owner {
2763 new_present = true;
2764 }
2765 if privilege.grantee == old_owner {
2767 privilege.grantee = new_owner;
2768 } else if privilege.grantee == new_owner {
2769 new_present = true;
2770 }
2771 }
2772
2773 if new_present {
2777 let privilege_map: BTreeMap<_, Vec<_>> =
2779 flat_privileges
2780 .into_iter()
2781 .fold(BTreeMap::new(), |mut accum, privilege| {
2782 accum
2783 .entry((privilege.grantee, privilege.grantor))
2784 .or_default()
2785 .push(privilege);
2786 accum
2787 });
2788
2789 flat_privileges = privilege_map
2791 .into_iter()
2792 .map(|((grantee, grantor), values)|
2793 values.into_iter().fold(
2795 MzAclItem::empty(grantee, grantor),
2796 |mut accum, mz_aclitem| {
2797 accum.acl_mode =
2798 accum.acl_mode.union(mz_aclitem.acl_mode);
2799 accum
2800 },
2801 ))
2802 .collect();
2803 }
2804
2805 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2806 }
2807}
2808
2809fn tx_replace_item(
2818 tx: &mut Transaction<'_>,
2819 state: &CatalogState,
2820 id: CatalogItemId,
2821 new_entry: CatalogEntry,
2822) -> Result<(), AdapterError> {
2823 let new_id = new_entry.id;
2824
2825 for use_id in new_entry.referenced_by() {
2827 if tx.get_item(use_id).is_none() {
2829 continue;
2830 }
2831
2832 let mut dependent = state.get_entry(use_id).clone();
2833 dependent.item = dependent.item.replace_item_refs(id, new_id);
2834 tx.update_item(*use_id, dependent.into())?;
2835 }
2836
2837 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2839 let new_comment_id = new_entry.comment_object_id();
2840 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2841 tx.drop_comments(&[old_comment_id].into())?;
2842 for (sub, comment) in comments {
2843 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2844 }
2845 }
2846
2847 let mz_catalog::durable::Item {
2848 id: _,
2849 oid,
2850 global_id,
2851 schema_id,
2852 name,
2853 create_sql,
2854 owner_id,
2855 privileges,
2856 extra_versions,
2857 } = new_entry.into();
2858
2859 tx.remove_item(id)?;
2860 tx.insert_item(
2861 new_id,
2862 oid,
2863 global_id,
2864 schema_id,
2865 &name,
2866 create_sql,
2867 owner_id,
2868 privileges,
2869 extra_versions,
2870 )?;
2871
2872 Ok(())
2873}
2874
2875fn apply_replacement_audit_events(
2877 state: &CatalogState,
2878 target: &CatalogEntry,
2879 replacement: &CatalogEntry,
2880) -> Vec<(EventType, EventDetails)> {
2881 let mut events = Vec::new();
2882
2883 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2884 let target_id_name = IdFullNameV1 {
2885 id: target.id().to_string(),
2886 name: Catalog::full_name_detail(&target_name),
2887 };
2888 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2889 let replacement_id_name = IdFullNameV1 {
2890 id: replacement.id().to_string(),
2891 name: Catalog::full_name_detail(&replacement_name),
2892 };
2893
2894 if Catalog::should_audit_log_item(&replacement.item) {
2895 events.push((
2896 EventType::Drop,
2897 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2898 ));
2899 }
2900
2901 if Catalog::should_audit_log_item(&target.item) {
2902 events.push((
2903 EventType::Alter,
2904 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2905 target: target_id_name.clone(),
2906 replacement: replacement_id_name,
2907 }),
2908 ));
2909
2910 if let Some(old_cluster_id) = target.cluster_id()
2911 && let Some(new_cluster_id) = replacement.cluster_id()
2912 && old_cluster_id != new_cluster_id
2913 {
2914 events.push((
2917 EventType::Alter,
2918 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2919 id: replacement.id().to_string(),
2920 name: target_id_name.name,
2921 old_cluster_id: old_cluster_id.to_string(),
2922 new_cluster_id: new_cluster_id.to_string(),
2923 }),
2924 ));
2925 }
2926 }
2927
2928 events
2929}
2930
2931#[derive(Debug, Default)]
2939pub(crate) struct ObjectsToDrop {
2940 pub comments: BTreeSet<CommentObjectId>,
2941 pub databases: BTreeSet<DatabaseId>,
2942 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2943 pub clusters: BTreeSet<ClusterId>,
2944 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2945 pub roles: BTreeSet<RoleId>,
2946 pub items: Vec<CatalogItemId>,
2947 pub network_policies: BTreeSet<NetworkPolicyId>,
2948}
2949
2950impl ObjectsToDrop {
2951 pub fn generate(
2952 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2953 state: &CatalogState,
2954 session: Option<&ConnMeta>,
2955 ) -> Result<Self, AdapterError> {
2956 let mut delta = ObjectsToDrop::default();
2957
2958 for drop_object_info in drop_object_infos {
2959 delta.add_item(drop_object_info, state, session)?;
2960 }
2961
2962 Ok(delta)
2963 }
2964
2965 fn add_item(
2966 &mut self,
2967 drop_object_info: DropObjectInfo,
2968 state: &CatalogState,
2969 session: Option<&ConnMeta>,
2970 ) -> Result<(), AdapterError> {
2971 self.comments
2972 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2973
2974 match drop_object_info {
2975 DropObjectInfo::Database(database_id) => {
2976 let database = &state.database_by_id[&database_id];
2977 if database_id.is_system() {
2978 return Err(AdapterError::Catalog(Error::new(
2979 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2980 )));
2981 }
2982
2983 self.databases.insert(database_id);
2984 }
2985 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2986 let schema = state.get_schema(
2987 &database_spec,
2988 &schema_spec,
2989 session
2990 .map(|session| session.conn_id())
2991 .unwrap_or(&SYSTEM_CONN_ID),
2992 );
2993 let schema_id: SchemaId = schema_spec.into();
2994 if schema_id.is_system() {
2995 let name = schema.name();
2996 let full_name = state.resolve_full_schema_name(name);
2997 return Err(AdapterError::Catalog(Error::new(
2998 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2999 )));
3000 }
3001
3002 self.schemas.insert(schema_spec, database_spec);
3003 }
3004 DropObjectInfo::Role(role_id) => {
3005 let name = state.get_role(&role_id).name().to_string();
3006 if role_id.is_system() || role_id.is_predefined() {
3007 return Err(AdapterError::Catalog(Error::new(
3008 ErrorKind::ReservedRoleName(name.clone()),
3009 )));
3010 }
3011 state.ensure_not_reserved_role(&role_id)?;
3012
3013 self.roles.insert(role_id);
3014 }
3015 DropObjectInfo::Cluster(cluster_id) => {
3016 let cluster = state.get_cluster(cluster_id);
3017 let name = &cluster.name;
3018 if cluster_id.is_system() {
3019 return Err(AdapterError::Catalog(Error::new(
3020 ErrorKind::ReadOnlyCluster(name.clone()),
3021 )));
3022 }
3023
3024 self.clusters.insert(cluster_id);
3025 }
3026 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3027 let cluster = state.get_cluster(cluster_id);
3028 let replica = cluster.replica(replica_id).expect("Must exist");
3029
3030 self.replicas
3031 .insert(replica.replica_id, (cluster.id, reason));
3032
3033 for (_id, entry) in state.get_entries() {
3037 if let CatalogItem::MaterializedView(mv) = entry.item() {
3038 if mv.target_replica == Some(replica_id)
3039 && !self.items.contains(&entry.id())
3040 {
3041 tracing::warn!(
3042 "implicitly dropping materialized view {} because target replica was dropped",
3043 entry.name().item,
3044 );
3045 self.items.push(entry.id());
3046 }
3047 }
3048 }
3049 }
3050 DropObjectInfo::Item(item_id) => {
3051 let entry = state.get_entry(&item_id);
3052 if item_id.is_system() {
3053 let name = entry.name();
3054 let full_name =
3055 state.resolve_full_name(name, session.map(|session| session.conn_id()));
3056 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3057 full_name.to_string(),
3058 ))));
3059 }
3060
3061 self.items.push(item_id);
3062 }
3063 DropObjectInfo::NetworkPolicy(network_policy_id) => {
3064 let policy = state.get_network_policy(&network_policy_id);
3065 let name = &policy.name;
3066 if network_policy_id.is_system() {
3067 return Err(AdapterError::Catalog(Error::new(
3068 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3069 )));
3070 }
3071
3072 self.network_policies.insert(network_policy_id);
3073 }
3074 }
3075
3076 Ok(())
3077 }
3078}
3079
3080#[cfg(test)]
3081mod tests {
3082 use mz_catalog::SYSTEM_CONN_ID;
3083 use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3084 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3085 use mz_repr::role_id::RoleId;
3086 use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3087 use mz_sql::DEFAULT_SCHEMA;
3088 use mz_sql::catalog::CatalogDatabase;
3089 use mz_sql::names::{
3090 ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3091 };
3092 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3093
3094 use crate::catalog::{Catalog, Op};
3095 use crate::session::DEFAULT_DATABASE_NAME;
3096
3097 #[mz_ore::test]
3098 fn test_update_privilege_owners() {
3099 let old_owner = RoleId::User(1);
3100 let new_owner = RoleId::User(2);
3101 let other_role = RoleId::User(3);
3102
3103 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3105 MzAclItem {
3106 grantee: other_role,
3107 grantor: old_owner,
3108 acl_mode: AclMode::UPDATE,
3109 },
3110 MzAclItem {
3111 grantee: other_role,
3112 grantor: new_owner,
3113 acl_mode: AclMode::SELECT,
3114 },
3115 ]);
3116 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3117 assert_eq!(1, privileges.all_values().count());
3118 assert_eq!(
3119 vec![MzAclItem {
3120 grantee: other_role,
3121 grantor: new_owner,
3122 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3123 }],
3124 privileges.all_values_owned().collect::<Vec<_>>()
3125 );
3126
3127 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3129 MzAclItem {
3130 grantee: old_owner,
3131 grantor: other_role,
3132 acl_mode: AclMode::UPDATE,
3133 },
3134 MzAclItem {
3135 grantee: new_owner,
3136 grantor: other_role,
3137 acl_mode: AclMode::SELECT,
3138 },
3139 ]);
3140 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3141 assert_eq!(1, privileges.all_values().count());
3142 assert_eq!(
3143 vec![MzAclItem {
3144 grantee: new_owner,
3145 grantor: other_role,
3146 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3147 }],
3148 privileges.all_values_owned().collect::<Vec<_>>()
3149 );
3150
3151 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3153 MzAclItem {
3154 grantee: old_owner,
3155 grantor: old_owner,
3156 acl_mode: AclMode::UPDATE,
3157 },
3158 MzAclItem {
3159 grantee: new_owner,
3160 grantor: new_owner,
3161 acl_mode: AclMode::SELECT,
3162 },
3163 ]);
3164 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3165 assert_eq!(1, privileges.all_values().count());
3166 assert_eq!(
3167 vec![MzAclItem {
3168 grantee: new_owner,
3169 grantor: new_owner,
3170 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3171 }],
3172 privileges.all_values_owned().collect::<Vec<_>>()
3173 );
3174 }
3175
3176 #[mz_ore::test(tokio::test)]
3183 #[cfg_attr(miri, ignore)] async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3185 Catalog::with_debug(|catalog| async move {
3186 let database = catalog
3188 .resolve_database(DEFAULT_DATABASE_NAME)
3189 .expect("default database");
3190 let database_name = database.name.clone();
3191 let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3192 let schema = catalog
3193 .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3194 .expect("default schema");
3195 let schema_name = schema.name.schema.clone();
3196 let schema_spec = schema.id.clone();
3197
3198 let (id_t1, global_id_t1) = catalog
3200 .allocate_user_id_for_test()
3201 .await
3202 .expect("allocate id for t1");
3203 let (id_t2, global_id_t2) = catalog
3204 .allocate_user_id_for_test()
3205 .await
3206 .expect("allocate id for t2");
3207
3208 let oracle_write_ts = catalog.current_upper().await;
3209
3210 let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3212 id,
3213 name: QualifiedItemName {
3214 qualifiers: ItemQualifiers {
3215 database_spec: database_spec.clone(),
3216 schema_spec: schema_spec.clone(),
3217 },
3218 item: name.to_string(),
3219 },
3220 item: CatalogItem::Table(Table {
3221 create_sql: Some(format!(
3222 "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3223 )),
3224 desc: VersionedRelationDesc::new(RelationDesc::empty()),
3225 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3226 conn_id: None,
3227 resolved_ids: ResolvedIds::empty(),
3228 custom_logical_compaction_window: None,
3229 is_retained_metrics_object: false,
3230 data_source: TableDataSource::TableWrites { defaults: vec![] },
3231 }),
3232 owner_id: MZ_SYSTEM_ROLE_ID,
3233 };
3234
3235 let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3236 let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3237
3238 let base_state = catalog.state().clone();
3239
3240 let (state_after_t1, snapshot_after_t1) = catalog
3244 .transact_incremental_dry_run(
3245 &base_state,
3246 vec![op_t1.clone()],
3247 None,
3248 None,
3249 oracle_write_ts,
3250 )
3251 .await
3252 .expect("first dry run");
3253
3254 assert!(
3256 state_after_t1.try_get_entry(&id_t1).is_some(),
3257 "t1 should exist after first dry run"
3258 );
3259 assert_eq!(
3260 state_after_t1
3261 .try_get_entry(&id_t1)
3262 .expect("t1 entry")
3263 .name()
3264 .item,
3265 "t1"
3266 );
3267 assert!(
3268 state_after_t1.try_get_entry(&id_t2).is_none(),
3269 "t2 should NOT exist after first dry run"
3270 );
3271
3272 let (state_incremental, _) = catalog
3274 .transact_incremental_dry_run(
3275 &state_after_t1,
3276 vec![op_t2.clone()],
3277 None,
3278 Some(snapshot_after_t1),
3279 oracle_write_ts,
3280 )
3281 .await
3282 .expect("second dry run");
3283
3284 assert!(
3286 state_incremental.try_get_entry(&id_t1).is_some(),
3287 "t1 should exist in incremental result"
3288 );
3289 assert!(
3290 state_incremental.try_get_entry(&id_t2).is_some(),
3291 "t2 should exist in incremental result"
3292 );
3293
3294 let (state_all_at_once, _) = catalog
3297 .transact_incremental_dry_run(
3298 &base_state,
3299 vec![op_t1.clone(), op_t2.clone()],
3300 None,
3301 None,
3302 oracle_write_ts,
3303 )
3304 .await
3305 .expect("all-at-once dry run");
3306
3307 assert!(
3308 state_all_at_once.try_get_entry(&id_t1).is_some(),
3309 "t1 should exist in all-at-once result"
3310 );
3311 assert!(
3312 state_all_at_once.try_get_entry(&id_t2).is_some(),
3313 "t2 should exist in all-at-once result"
3314 );
3315
3316 let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3319 let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3320 assert_eq!(inc_t1.name(), all_t1.name());
3321 assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3322
3323 let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3324 let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3325 assert_eq!(inc_t2.name(), all_t2.name());
3326 assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3327
3328 catalog.expire().await;
3329 })
3330 .await
3331 }
3332}