1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35 StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50 AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
51 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
52 DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
53 RoleVars,
54};
55use mz_sql::names::{
56 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
57 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{NetworkPolicyRule, PlanError};
60use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
61use mz_sql::session::vars::OwnedVarInput;
62use mz_sql::session::vars::{Value as VarValue, VarInput};
63use mz_sql::{DEFAULT_SCHEMA, rbac};
64use mz_sql_parser::ast::{QualifiedReplica, Value};
65use mz_storage_client::storage_collections::StorageCollections;
66use serde::{Deserialize, Serialize};
67use tracing::{info, trace};
68
69use crate::AdapterError;
70use crate::catalog::state::LocalExpressionCache;
71use crate::catalog::{
72 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
73 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
74 is_reserved_role_name, object_type_to_audit_object_type,
75 system_object_type_to_audit_object_type,
76};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88 pub event_type: EventType,
89 pub object_type: ObjectType,
90 pub details: EventDetails,
91 pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96 AlterRetainHistory {
97 id: CatalogItemId,
98 value: Option<Value>,
99 window: CompactionWindow,
100 },
101 AlterSourceTimestampInterval {
102 id: CatalogItemId,
103 value: Option<Value>,
104 interval: Duration,
105 },
106 AlterRole {
107 id: RoleId,
108 name: String,
109 attributes: RoleAttributesRaw,
110 nopassword: bool,
111 vars: RoleVars,
112 },
113 AlterNetworkPolicy {
114 id: NetworkPolicyId,
115 rules: Vec<NetworkPolicyRule>,
116 name: String,
117 owner_id: RoleId,
118 },
119 AlterAddColumn {
120 id: CatalogItemId,
121 new_global_id: GlobalId,
122 name: ColumnName,
123 typ: SqlColumnType,
124 sql: RawDataType,
125 },
126 AlterMaterializedViewApplyReplacement {
127 id: CatalogItemId,
128 replacement_id: CatalogItemId,
129 },
130 CreateDatabase {
131 name: String,
132 owner_id: RoleId,
133 },
134 CreateSchema {
135 database_id: ResolvedDatabaseSpecifier,
136 schema_name: String,
137 owner_id: RoleId,
138 },
139 CreateRole {
140 name: String,
141 attributes: RoleAttributesRaw,
142 },
143 CreateCluster {
144 id: ClusterId,
145 name: String,
146 introspection_sources: Vec<&'static BuiltinLog>,
147 owner_id: RoleId,
148 config: ClusterConfig,
149 },
150 CreateClusterReplica {
151 cluster_id: ClusterId,
152 name: String,
153 config: ReplicaConfig,
154 owner_id: RoleId,
155 reason: ReplicaCreateDropReason,
156 },
157 CreateItem {
158 id: CatalogItemId,
159 name: QualifiedItemName,
160 item: CatalogItem,
161 owner_id: RoleId,
162 },
163 CreateNetworkPolicy {
164 rules: Vec<NetworkPolicyRule>,
165 name: String,
166 owner_id: RoleId,
167 },
168 Comment {
169 object_id: CommentObjectId,
170 sub_component: Option<usize>,
171 comment: Option<String>,
172 },
173 DropObjects(Vec<DropObjectInfo>),
174 GrantRole {
175 role_id: RoleId,
176 member_id: RoleId,
177 grantor_id: RoleId,
178 },
179 RenameCluster {
180 id: ClusterId,
181 name: String,
182 to_name: String,
183 check_reserved_names: bool,
184 },
185 RenameClusterReplica {
186 cluster_id: ClusterId,
187 replica_id: ReplicaId,
188 name: QualifiedReplica,
189 to_name: String,
190 },
191 RenameItem {
192 id: CatalogItemId,
193 current_full_name: FullItemName,
194 to_name: String,
195 },
196 RenameSchema {
197 database_spec: ResolvedDatabaseSpecifier,
198 schema_spec: SchemaSpecifier,
199 new_name: String,
200 check_reserved_names: bool,
201 },
202 UpdateOwner {
203 id: ObjectId,
204 new_owner: RoleId,
205 },
206 UpdatePrivilege {
207 target_id: SystemObjectId,
208 privilege: MzAclItem,
209 variant: UpdatePrivilegeVariant,
210 },
211 UpdateDefaultPrivilege {
212 privilege_object: DefaultPrivilegeObject,
213 privilege_acl_item: DefaultPrivilegeAclItem,
214 variant: UpdatePrivilegeVariant,
215 },
216 RevokeRole {
217 role_id: RoleId,
218 member_id: RoleId,
219 grantor_id: RoleId,
220 },
221 UpdateClusterConfig {
222 id: ClusterId,
223 name: String,
224 config: ClusterConfig,
225 },
226 UpdateClusterReplicaConfig {
227 cluster_id: ClusterId,
228 replica_id: ReplicaId,
229 config: ReplicaConfig,
230 },
231 UpdateItem {
232 id: CatalogItemId,
233 name: QualifiedItemName,
234 to_item: CatalogItem,
235 },
236 UpdateSourceReferences {
237 source_id: CatalogItemId,
238 references: SourceReferences,
239 },
240 UpdateSystemConfiguration {
241 name: String,
242 value: OwnedVarInput,
243 },
244 ResetSystemConfiguration {
245 name: String,
246 },
247 ResetAllSystemConfiguration,
248 WeirdStorageUsageUpdates {
255 object_id: Option<String>,
256 size_bytes: u64,
257 collection_timestamp: EpochMillis,
258 },
259 InjectAuditEvents {
265 events: Vec<InjectedAuditEvent>,
266 },
267}
268
269#[derive(Debug, Clone)]
273pub enum DropObjectInfo {
274 Cluster(ClusterId),
275 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
276 Database(DatabaseId),
277 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
278 Role(RoleId),
279 Item(CatalogItemId),
280 NetworkPolicy(NetworkPolicyId),
281}
282
283impl DropObjectInfo {
284 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
287 match id {
288 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
289 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
290 cluster_id,
291 replica_id,
292 ReplicaCreateDropReason::Manual,
293 )),
294 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
295 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
296 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
297 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
298 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
299 }
300 }
301
302 fn to_object_id(&self) -> ObjectId {
305 match &self {
306 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
307 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
308 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
309 }
310 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
311 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
312 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
313 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
314 DropObjectInfo::NetworkPolicy(network_policy_id) => {
315 ObjectId::NetworkPolicy(network_policy_id.clone())
316 }
317 }
318 }
319}
320
321#[derive(Debug, Clone)]
323pub enum ReplicaCreateDropReason {
324 Manual,
329 ClusterScheduling(Vec<SchedulingDecision>),
332}
333
334impl ReplicaCreateDropReason {
335 pub fn into_audit_log(
336 self,
337 ) -> (
338 CreateOrDropClusterReplicaReasonV1,
339 Option<SchedulingDecisionsWithReasonsV2>,
340 ) {
341 let (reason, scheduling_policies) = match self {
342 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
343 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
344 CreateOrDropClusterReplicaReasonV1::Schedule,
345 Some(scheduling_decisions),
346 ),
347 };
348 (
349 reason,
350 scheduling_policies
351 .as_ref()
352 .map(SchedulingDecision::reasons_to_audit_log_reasons),
353 )
354 }
355}
356
357pub struct TransactionResult {
358 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
359 pub catalog_updates: Vec<ParsedStateUpdate>,
361 pub audit_events: Vec<VersionedEvent>,
362}
363
364#[derive(Debug, Clone, Copy)]
365enum TransactInnerMode {
366 Commit,
370 DryRun,
377}
378
379impl Catalog {
380 fn should_audit_log_item(item: &CatalogItem) -> bool {
381 !item.is_temporary()
382 }
383
384 fn temporary_ids(
387 &self,
388 ops: &[Op],
389 temporary_drops: BTreeSet<(&ConnectionId, String)>,
390 ) -> Result<BTreeSet<CatalogItemId>, Error> {
391 let mut creating = BTreeSet::new();
392 let mut temporary_ids = BTreeSet::new();
393 for op in ops.iter() {
394 if let Op::CreateItem {
395 id,
396 name,
397 item,
398 owner_id: _,
399 } = op
400 {
401 if let Some(conn_id) = item.conn_id() {
402 if self.item_exists_in_temp_schemas(conn_id, &name.item)
403 && !temporary_drops.contains(&(conn_id, name.item.clone()))
404 || creating.contains(&(conn_id, &name.item))
405 {
406 return Err(
407 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
408 );
409 } else {
410 creating.insert((conn_id, &name.item));
411 temporary_ids.insert(id.clone());
412 }
413 }
414 }
415 }
416 Ok(temporary_ids)
417 }
418
419 #[instrument(name = "catalog::transact")]
420 pub async fn transact(
421 &mut self,
422 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
425 oracle_write_ts: mz_repr::Timestamp,
426 session: Option<&ConnMeta>,
427 ops: Vec<Op>,
428 ) -> Result<TransactionResult, AdapterError> {
429 trace!("transact: {:?}", ops);
430 fail::fail_point!("catalog_transact", |arg| {
431 Err(AdapterError::Unstructured(anyhow::anyhow!(
432 "failpoint: {arg:?}"
433 )))
434 });
435
436 let drop_ids: BTreeSet<CatalogItemId> = ops
437 .iter()
438 .filter_map(|op| match op {
439 Op::DropObjects(drop_object_infos) => {
440 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
441 let item_ids = ids.filter_map(|id| match id {
442 ObjectId::Item(id) => Some(id),
443 _ => None,
444 });
445 Some(item_ids)
446 }
447 _ => None,
448 })
449 .flatten()
450 .collect();
451 let temporary_drops = drop_ids
452 .iter()
453 .filter_map(|id| {
454 let entry = self.get_entry(id);
455 match entry.item().conn_id() {
456 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
457 None => None,
458 }
459 })
460 .collect();
461
462 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
463 let mut builtin_table_updates = vec![];
464 let mut catalog_updates = vec![];
465 let mut audit_events = vec![];
466 let mut storage = self.storage().await;
467 let mut tx = storage
468 .transaction()
469 .await
470 .unwrap_or_terminate("starting catalog transaction");
471
472 let new_state = Self::transact_inner(
473 TransactInnerMode::Commit,
474 storage_collections,
475 oracle_write_ts,
476 session,
477 ops,
478 temporary_ids,
479 &mut builtin_table_updates,
480 &mut catalog_updates,
481 &mut audit_events,
482 &mut tx,
483 &self.state,
484 )
485 .await?;
486
487 tx.commit(oracle_write_ts)
492 .await
493 .unwrap_or_terminate("catalog storage transaction commit must succeed");
494
495 drop(storage);
498 if let Some(new_state) = new_state {
499 self.transient_revision += 1;
500 self.state = new_state;
501 }
502
503 Ok(TransactionResult {
504 builtin_table_updates,
505 catalog_updates,
506 audit_events,
507 })
508 }
509
510 pub async fn transact_incremental_dry_run(
525 &self,
526 base_state: &CatalogState,
527 ops: Vec<Op>,
528 session: Option<&ConnMeta>,
529 prev_snapshot: Option<Snapshot>,
530 oracle_write_ts: mz_repr::Timestamp,
531 ) -> Result<(CatalogState, Snapshot), AdapterError> {
532 let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
535
536 let mut builtin_table_updates = vec![];
537 let mut catalog_updates = vec![];
538 let mut audit_events = vec![];
539 let mut storage = self.storage().await;
540 let mut tx = if let Some(snapshot) = prev_snapshot {
541 storage
544 .transaction_from_snapshot(snapshot)
545 .unwrap_or_terminate("starting catalog transaction from snapshot")
546 } else {
547 storage
550 .transaction()
551 .await
552 .unwrap_or_terminate("starting catalog transaction")
553 };
554
555 let new_state = Self::transact_inner(
557 TransactInnerMode::DryRun,
558 None,
559 oracle_write_ts,
560 session,
561 ops,
562 temporary_ids,
563 &mut builtin_table_updates,
564 &mut catalog_updates,
565 &mut audit_events,
566 &mut tx,
567 base_state,
568 )
569 .await?;
570
571 let new_snapshot = tx.current_snapshot();
574
575 drop(storage);
577
578 let state = new_state.unwrap_or_else(|| base_state.clone());
580 Ok((state, new_snapshot))
581 }
582
583 fn extract_expressions_from_ops(
587 ops: &[Op],
588 optimizer_features: &OptimizerFeatures,
589 ) -> BTreeMap<GlobalId, LocalExpressions> {
590 let mut exprs = BTreeMap::new();
591
592 for op in ops {
593 if let Op::CreateItem { item, .. } = op {
594 match item {
595 CatalogItem::View(view) => {
596 exprs.insert(
597 view.global_id,
598 LocalExpressions {
599 local_mir: (*view.locally_optimized_expr).clone(),
600 optimizer_features: optimizer_features.clone(),
601 },
602 );
603 }
604 CatalogItem::MaterializedView(mv) => {
605 exprs.insert(
606 mv.global_id_writes(),
607 LocalExpressions {
608 local_mir: (*mv.locally_optimized_expr).clone(),
609 optimizer_features: optimizer_features.clone(),
610 },
611 );
612 }
613 CatalogItem::Table(_)
614 | CatalogItem::Source(_)
615 | CatalogItem::Log(_)
616 | CatalogItem::Sink(_)
617 | CatalogItem::Index(_)
618 | CatalogItem::Type(_)
619 | CatalogItem::Func(_)
620 | CatalogItem::Secret(_)
621 | CatalogItem::Connection(_)
622 | CatalogItem::ContinualTask(_) => {}
623 }
624 }
625 }
626
627 exprs
628 }
629
630 #[instrument(name = "catalog::transact_inner")]
638 async fn transact_inner(
639 mode: TransactInnerMode,
640 storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
641 oracle_write_ts: mz_repr::Timestamp,
642 session: Option<&ConnMeta>,
643 ops: Vec<Op>,
644 temporary_ids: BTreeSet<CatalogItemId>,
645 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
646 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
647 audit_events: &mut Vec<VersionedEvent>,
648 tx: &mut Transaction<'_>,
649 state: &CatalogState,
650 ) -> Result<Option<CatalogState>, AdapterError> {
651 let mut preliminary_state = Cow::Borrowed(state);
678
679 let mut state = Cow::Borrowed(state);
681
682 if ops.is_empty() {
683 return Ok(None);
684 }
685
686 let optimizer_features = OptimizerFeatures::from(state.system_config());
689 let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
690
691 let mut storage_collections_to_create = BTreeSet::new();
692 let mut storage_collections_to_drop = BTreeSet::new();
693 let mut storage_collections_to_register = BTreeMap::new();
694
695 let mut updates = Vec::new();
696
697 for op in ops {
698 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
699 oracle_write_ts,
700 session,
701 op,
702 &temporary_ids,
703 audit_events,
704 tx,
705 &*preliminary_state,
706 &mut storage_collections_to_create,
707 &mut storage_collections_to_drop,
708 &mut storage_collections_to_register,
709 )
710 .await?;
711
712 if let Some(builtin_table_update) = weird_builtin_table_update {
721 builtin_table_updates.push(builtin_table_update);
722 }
723
724 let upper = tx.upper();
729 let temporary_item_updates =
730 temporary_item_updates
731 .into_iter()
732 .map(|(item, diff)| StateUpdate {
733 kind: StateUpdateKind::TemporaryItem(item),
734 ts: upper,
735 diff,
736 });
737
738 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
739 op_updates.extend(temporary_item_updates);
740 if !op_updates.is_empty() {
741 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
744 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
745 .to_mut()
746 .apply_updates(op_updates.clone(), &mut local_expr_cache)
747 .await;
748 }
749 updates.append(&mut op_updates);
750 }
751
752 if !updates.is_empty() {
753 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
754 let (op_builtin_table_updates, op_catalog_updates) = state
755 .to_mut()
756 .apply_updates(updates.clone(), &mut local_expr_cache)
757 .await;
758 let op_builtin_table_updates = state
759 .to_mut()
760 .resolve_builtin_table_updates(op_builtin_table_updates);
761 builtin_table_updates.extend(op_builtin_table_updates);
762 parsed_catalog_updates.extend(op_catalog_updates);
763 }
764
765 match mode {
766 TransactInnerMode::Commit => {
767 if let Some(c) = storage_collections {
769 c.prepare_state(
770 tx,
771 storage_collections_to_create,
772 storage_collections_to_drop,
773 storage_collections_to_register,
774 )
775 .await?;
776 }
777 }
778 TransactInnerMode::DryRun => {
779 debug_assert!(
780 storage_collections.is_none(),
781 "dry-run mode must not prepare storage state"
782 );
783 }
784 }
785
786 let updates = tx.get_and_commit_op_updates();
787 if !updates.is_empty() {
788 let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
789 let (op_builtin_table_updates, op_catalog_updates) = state
790 .to_mut()
791 .apply_updates(updates.clone(), &mut local_expr_cache)
792 .await;
793 let op_builtin_table_updates = state
794 .to_mut()
795 .resolve_builtin_table_updates(op_builtin_table_updates);
796 builtin_table_updates.extend(op_builtin_table_updates);
797 parsed_catalog_updates.extend(op_catalog_updates);
798 }
799
800 match state {
801 Cow::Owned(state) => Ok(Some(state)),
802 Cow::Borrowed(_) => Ok(None),
803 }
804 }
805
806 #[instrument]
814 async fn transact_op(
815 oracle_write_ts: mz_repr::Timestamp,
816 session: Option<&ConnMeta>,
817 op: Op,
818 temporary_ids: &BTreeSet<CatalogItemId>,
819 audit_events: &mut Vec<VersionedEvent>,
820 tx: &mut Transaction<'_>,
821 state: &CatalogState,
822 storage_collections_to_create: &mut BTreeSet<GlobalId>,
823 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
824 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
825 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
826 let mut weird_builtin_table_update = None;
827 let mut temporary_item_updates = Vec::new();
828
829 match op {
830 Op::AlterRetainHistory { id, value, window } => {
831 let entry = state.get_entry(&id);
832 if id.is_system() {
833 let name = entry.name();
834 let full_name =
835 state.resolve_full_name(name, session.map(|session| session.conn_id()));
836 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
837 full_name.to_string(),
838 ))));
839 }
840
841 let mut new_entry = entry.clone();
842 let previous = new_entry
843 .item
844 .update_retain_history(value.clone(), window)
845 .map_err(|_| {
846 AdapterError::Catalog(Error::new(ErrorKind::Internal(
847 "planner should have rejected invalid alter retain history item type"
848 .to_string(),
849 )))
850 })?;
851
852 if Self::should_audit_log_item(new_entry.item()) {
853 let details =
854 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
855 id: id.to_string(),
856 old_history: previous.map(|previous| previous.to_string()),
857 new_history: value.map(|v| v.to_string()),
858 });
859 CatalogState::add_to_audit_log(
860 &state.system_configuration,
861 oracle_write_ts,
862 session,
863 tx,
864 audit_events,
865 EventType::Alter,
866 catalog_type_to_audit_object_type(new_entry.item().typ()),
867 details,
868 )?;
869 }
870
871 tx.update_item(id, new_entry.into())?;
872
873 Self::log_update(state, &id);
874 }
875 Op::AlterSourceTimestampInterval {
876 id,
877 value,
878 interval,
879 } => {
880 let entry = state.get_entry(&id);
881 if id.is_system() {
882 let name = entry.name();
883 let full_name =
884 state.resolve_full_name(name, session.map(|session| session.conn_id()));
885 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
886 full_name.to_string(),
887 ))));
888 }
889
890 let mut new_entry = entry.clone();
891 new_entry
892 .item
893 .update_timestamp_interval(value, interval)
894 .map_err(|_| {
895 AdapterError::Catalog(Error::new(ErrorKind::Internal(
896 "planner should have rejected invalid alter timestamp interval item type"
897 .to_string(),
898 )))
899 })?;
900
901 tx.update_item(id, new_entry.into())?;
902
903 Self::log_update(state, &id);
904 }
905 Op::AlterRole {
906 id,
907 name,
908 attributes,
909 nopassword,
910 vars,
911 } => {
912 state.ensure_not_reserved_role(&id)?;
913
914 let mut existing_role = state.get_role(&id).clone();
915 let password = attributes.password.clone();
916 let scram_iterations = attributes
917 .scram_iterations
918 .unwrap_or_else(|| state.system_config().scram_iterations());
919 existing_role.attributes = attributes.into();
920 existing_role.vars = vars;
921 let password_action = if nopassword {
922 PasswordAction::Clear
923 } else if let Some(password) = password {
924 PasswordAction::Set(PasswordConfig {
925 password,
926 scram_iterations,
927 })
928 } else {
929 PasswordAction::NoChange
930 };
931 tx.update_role(id, existing_role.into(), password_action)?;
932
933 CatalogState::add_to_audit_log(
934 &state.system_configuration,
935 oracle_write_ts,
936 session,
937 tx,
938 audit_events,
939 EventType::Alter,
940 ObjectType::Role,
941 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
942 id: id.to_string(),
943 name: name.clone(),
944 }),
945 )?;
946
947 info!("update role {name} ({id})");
948 }
949 Op::AlterNetworkPolicy {
950 id,
951 rules,
952 name,
953 owner_id: _owner_id,
954 } => {
955 let existing_policy = state.get_network_policy(&id).clone();
956 let mut policy: NetworkPolicy = existing_policy.into();
957 policy.rules = rules;
958 if is_reserved_name(&name) {
959 return Err(AdapterError::Catalog(Error::new(
960 ErrorKind::ReservedNetworkPolicyName(name),
961 )));
962 }
963 tx.update_network_policy(id, policy.clone())?;
964
965 CatalogState::add_to_audit_log(
966 &state.system_configuration,
967 oracle_write_ts,
968 session,
969 tx,
970 audit_events,
971 EventType::Alter,
972 ObjectType::NetworkPolicy,
973 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
974 id: id.to_string(),
975 name: name.clone(),
976 }),
977 )?;
978
979 info!("update network policy {name} ({id})");
980 }
981 Op::AlterAddColumn {
982 id,
983 new_global_id,
984 name,
985 typ,
986 sql,
987 } => {
988 let mut new_entry = state.get_entry(&id).clone();
989 let version = new_entry.item.add_column(name, typ, sql)?;
990 let shard_id = state
993 .storage_metadata()
994 .get_collection_shard(new_entry.latest_global_id())?;
995
996 let CatalogItem::Table(table) = &mut new_entry.item else {
998 return Err(AdapterError::Unsupported("adding columns to non-Table"));
999 };
1000 table.collections.insert(version, new_global_id);
1001
1002 tx.update_item(id, new_entry.into())?;
1003 storage_collections_to_register.insert(new_global_id, shard_id);
1004 }
1005 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1006 let mut new_entry = state.get_entry(&id).clone();
1007 let replacement = state.get_entry(&replacement_id);
1008
1009 let new_audit_events =
1010 apply_replacement_audit_events(state, &new_entry, replacement);
1011
1012 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1013 return Err(AdapterError::internal(
1014 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1015 "id must refer to a materialized view",
1016 ));
1017 };
1018 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1019 return Err(AdapterError::internal(
1020 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1021 "replacement_id must refer to a materialized view",
1022 ));
1023 };
1024
1025 mv.apply_replacement(replacement_mv.clone());
1026
1027 tx.remove_item(replacement_id)?;
1028
1029 new_entry.id = replacement_id;
1030 tx_replace_item(tx, state, id, new_entry)?;
1031
1032 let comment_id = CommentObjectId::MaterializedView(replacement_id);
1033 tx.drop_comments(&[comment_id].into())?;
1034
1035 for (event_type, details) in new_audit_events {
1036 CatalogState::add_to_audit_log(
1037 &state.system_configuration,
1038 oracle_write_ts,
1039 session,
1040 tx,
1041 audit_events,
1042 event_type,
1043 ObjectType::MaterializedView,
1044 details,
1045 )?;
1046 }
1047 }
1048 Op::CreateDatabase { name, owner_id } => {
1049 let database_owner_privileges = vec![rbac::owner_privilege(
1050 mz_sql::catalog::ObjectType::Database,
1051 owner_id,
1052 )];
1053 let database_default_privileges = state
1054 .default_privileges
1055 .get_applicable_privileges(
1056 owner_id,
1057 None,
1058 None,
1059 mz_sql::catalog::ObjectType::Database,
1060 )
1061 .map(|item| item.mz_acl_item(owner_id));
1062 let database_privileges: Vec<_> = merge_mz_acl_items(
1063 database_owner_privileges
1064 .into_iter()
1065 .chain(database_default_privileges),
1066 )
1067 .collect();
1068
1069 let schema_owner_privileges = vec![rbac::owner_privilege(
1070 mz_sql::catalog::ObjectType::Schema,
1071 owner_id,
1072 )];
1073 let schema_default_privileges = state
1074 .default_privileges
1075 .get_applicable_privileges(
1076 owner_id,
1077 None,
1078 None,
1079 mz_sql::catalog::ObjectType::Schema,
1080 )
1081 .map(|item| item.mz_acl_item(owner_id))
1082 .chain(std::iter::once(MzAclItem {
1084 grantee: RoleId::Public,
1085 grantor: owner_id,
1086 acl_mode: AclMode::USAGE,
1087 }));
1088 let schema_privileges: Vec<_> = merge_mz_acl_items(
1089 schema_owner_privileges
1090 .into_iter()
1091 .chain(schema_default_privileges),
1092 )
1093 .collect();
1094
1095 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1096 let (database_id, _) = tx.insert_user_database(
1097 &name,
1098 owner_id,
1099 database_privileges.clone(),
1100 &temporary_oids,
1101 )?;
1102 let (schema_id, _) = tx.insert_user_schema(
1103 database_id,
1104 DEFAULT_SCHEMA,
1105 owner_id,
1106 schema_privileges.clone(),
1107 &temporary_oids,
1108 )?;
1109 CatalogState::add_to_audit_log(
1110 &state.system_configuration,
1111 oracle_write_ts,
1112 session,
1113 tx,
1114 audit_events,
1115 EventType::Create,
1116 ObjectType::Database,
1117 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1118 id: database_id.to_string(),
1119 name: name.clone(),
1120 }),
1121 )?;
1122 info!("create database {}", name);
1123
1124 CatalogState::add_to_audit_log(
1125 &state.system_configuration,
1126 oracle_write_ts,
1127 session,
1128 tx,
1129 audit_events,
1130 EventType::Create,
1131 ObjectType::Schema,
1132 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1133 id: schema_id.to_string(),
1134 name: DEFAULT_SCHEMA.to_string(),
1135 database_name: Some(name),
1136 }),
1137 )?;
1138 }
1139 Op::CreateSchema {
1140 database_id,
1141 schema_name,
1142 owner_id,
1143 } => {
1144 if is_reserved_name(&schema_name) {
1145 return Err(AdapterError::Catalog(Error::new(
1146 ErrorKind::ReservedSchemaName(schema_name),
1147 )));
1148 }
1149 let database_id = match database_id {
1150 ResolvedDatabaseSpecifier::Id(id) => id,
1151 ResolvedDatabaseSpecifier::Ambient => {
1152 return Err(AdapterError::Catalog(Error::new(
1153 ErrorKind::ReadOnlySystemSchema(schema_name),
1154 )));
1155 }
1156 };
1157 let owner_privileges = vec![rbac::owner_privilege(
1158 mz_sql::catalog::ObjectType::Schema,
1159 owner_id,
1160 )];
1161 let default_privileges = state
1162 .default_privileges
1163 .get_applicable_privileges(
1164 owner_id,
1165 Some(database_id),
1166 None,
1167 mz_sql::catalog::ObjectType::Schema,
1168 )
1169 .map(|item| item.mz_acl_item(owner_id));
1170 let privileges: Vec<_> =
1171 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1172 .collect();
1173 let (schema_id, _) = tx.insert_user_schema(
1174 database_id,
1175 &schema_name,
1176 owner_id,
1177 privileges.clone(),
1178 &state.get_temporary_oids().collect(),
1179 )?;
1180 CatalogState::add_to_audit_log(
1181 &state.system_configuration,
1182 oracle_write_ts,
1183 session,
1184 tx,
1185 audit_events,
1186 EventType::Create,
1187 ObjectType::Schema,
1188 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1189 id: schema_id.to_string(),
1190 name: schema_name.clone(),
1191 database_name: Some(state.database_by_id[&database_id].name.clone()),
1192 }),
1193 )?;
1194 }
1195 Op::CreateRole { name, attributes } => {
1196 if is_reserved_role_name(&name) {
1197 return Err(AdapterError::Catalog(Error::new(
1198 ErrorKind::ReservedRoleName(name),
1199 )));
1200 }
1201 let membership = RoleMembership::new();
1202 let vars = RoleVars::default();
1203 let (id, _) = tx.insert_user_role(
1204 name.clone(),
1205 attributes.clone(),
1206 membership.clone(),
1207 vars.clone(),
1208 &state.get_temporary_oids().collect(),
1209 )?;
1210 CatalogState::add_to_audit_log(
1211 &state.system_configuration,
1212 oracle_write_ts,
1213 session,
1214 tx,
1215 audit_events,
1216 EventType::Create,
1217 ObjectType::Role,
1218 EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1219 id: id.to_string(),
1220 name: name.clone(),
1221 auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1222 AutoProvisionSource::Oidc => "oidc".to_string(),
1223 AutoProvisionSource::Frontegg => "frontegg".to_string(),
1224 AutoProvisionSource::None => "none".to_string(),
1225 }),
1226 }),
1227 )?;
1228 info!("create role {}", name);
1229 }
1230 Op::CreateCluster {
1231 id,
1232 name,
1233 introspection_sources,
1234 owner_id,
1235 config,
1236 } => {
1237 if is_reserved_name(&name) {
1238 return Err(AdapterError::Catalog(Error::new(
1239 ErrorKind::ReservedClusterName(name),
1240 )));
1241 }
1242 let owner_privileges = vec![rbac::owner_privilege(
1243 mz_sql::catalog::ObjectType::Cluster,
1244 owner_id,
1245 )];
1246 let default_privileges = state
1247 .default_privileges
1248 .get_applicable_privileges(
1249 owner_id,
1250 None,
1251 None,
1252 mz_sql::catalog::ObjectType::Cluster,
1253 )
1254 .map(|item| item.mz_acl_item(owner_id));
1255 let privileges: Vec<_> =
1256 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1257 .collect();
1258 let introspection_source_ids: Vec<_> = introspection_sources
1259 .iter()
1260 .map(|introspection_source| {
1261 Transaction::allocate_introspection_source_index_id(
1262 &id,
1263 introspection_source.variant,
1264 )
1265 })
1266 .collect();
1267
1268 let introspection_sources = introspection_sources
1269 .into_iter()
1270 .zip_eq(introspection_source_ids)
1271 .map(|(log, (item_id, gid))| (log, item_id, gid))
1272 .collect();
1273
1274 tx.insert_user_cluster(
1275 id,
1276 &name,
1277 introspection_sources,
1278 owner_id,
1279 privileges.clone(),
1280 config.clone().into(),
1281 &state.get_temporary_oids().collect(),
1282 )?;
1283 CatalogState::add_to_audit_log(
1284 &state.system_configuration,
1285 oracle_write_ts,
1286 session,
1287 tx,
1288 audit_events,
1289 EventType::Create,
1290 ObjectType::Cluster,
1291 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1292 id: id.to_string(),
1293 name: name.clone(),
1294 }),
1295 )?;
1296 info!("create cluster {}", name);
1297 }
1298 Op::CreateClusterReplica {
1299 cluster_id,
1300 name,
1301 config,
1302 owner_id,
1303 reason,
1304 } => {
1305 if is_reserved_name(&name) {
1306 return Err(AdapterError::Catalog(Error::new(
1307 ErrorKind::ReservedReplicaName(name),
1308 )));
1309 }
1310 let cluster = state.get_cluster(cluster_id);
1311 let id =
1312 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1313 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1314 size,
1315 billed_as,
1316 internal,
1317 ..
1318 }) = &config.location
1319 {
1320 let (reason, scheduling_policies) = reason.into_audit_log();
1321 let details = EventDetails::CreateClusterReplicaV4(
1322 mz_audit_log::CreateClusterReplicaV4 {
1323 cluster_id: cluster_id.to_string(),
1324 cluster_name: cluster.name.clone(),
1325 replica_id: Some(id.to_string()),
1326 replica_name: name.clone(),
1327 logical_size: size.clone(),
1328 billed_as: billed_as.clone(),
1329 internal: *internal,
1330 reason,
1331 scheduling_policies,
1332 },
1333 );
1334 CatalogState::add_to_audit_log(
1335 &state.system_configuration,
1336 oracle_write_ts,
1337 session,
1338 tx,
1339 audit_events,
1340 EventType::Create,
1341 ObjectType::ClusterReplica,
1342 details,
1343 )?;
1344 }
1345 }
1346 Op::CreateItem {
1347 id,
1348 name,
1349 item,
1350 owner_id,
1351 } => {
1352 state.check_unstable_dependencies(&item)?;
1353
1354 match &item {
1355 CatalogItem::Table(table) => {
1356 let gids: Vec<_> = table.global_ids().collect();
1357 assert_eq!(gids.len(), 1);
1358 storage_collections_to_create.extend(gids);
1359 }
1360 CatalogItem::Source(source) => {
1361 storage_collections_to_create.insert(source.global_id());
1362 }
1363 CatalogItem::MaterializedView(mv) => {
1364 let mv_gid = mv.global_id_writes();
1365 if let Some(target_id) = mv.replacement_target {
1366 let target_gid = state.get_entry(&target_id).latest_global_id();
1367 let shard_id =
1368 state.storage_metadata().get_collection_shard(target_gid)?;
1369 storage_collections_to_register.insert(mv_gid, shard_id);
1370 } else {
1371 storage_collections_to_create.insert(mv_gid);
1372 }
1373 }
1374 CatalogItem::ContinualTask(ct) => {
1375 storage_collections_to_create.insert(ct.global_id());
1376 }
1377 CatalogItem::Sink(sink) => {
1378 storage_collections_to_create.insert(sink.global_id());
1379 }
1380 CatalogItem::Log(_)
1381 | CatalogItem::View(_)
1382 | CatalogItem::Index(_)
1383 | CatalogItem::Type(_)
1384 | CatalogItem::Func(_)
1385 | CatalogItem::Secret(_)
1386 | CatalogItem::Connection(_) => (),
1387 }
1388
1389 let system_user = session.map_or(false, |s| s.user().is_system_user());
1390 if !system_user {
1391 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1392 let cluster_name = state.clusters_by_id[&id].name.clone();
1393 return Err(AdapterError::Catalog(Error::new(
1394 ErrorKind::ReadOnlyCluster(cluster_name),
1395 )));
1396 }
1397 }
1398
1399 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1400 let default_privileges = state
1401 .default_privileges
1402 .get_applicable_privileges(
1403 owner_id,
1404 name.qualifiers.database_spec.id(),
1405 Some(name.qualifiers.schema_spec.into()),
1406 item.typ().into(),
1407 )
1408 .map(|item| item.mz_acl_item(owner_id));
1409 let progress_source_privilege = if item.is_progress_source() {
1411 Some(MzAclItem {
1412 grantee: MZ_SUPPORT_ROLE_ID,
1413 grantor: owner_id,
1414 acl_mode: AclMode::SELECT,
1415 })
1416 } else {
1417 None
1418 };
1419 let privileges: Vec<_> = merge_mz_acl_items(
1420 owner_privileges
1421 .into_iter()
1422 .chain(default_privileges)
1423 .chain(progress_source_privilege),
1424 )
1425 .collect();
1426
1427 let temporary_oids = state.get_temporary_oids().collect();
1428
1429 if item.is_temporary() {
1430 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1431 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1432 {
1433 return Err(AdapterError::Catalog(Error::new(
1434 ErrorKind::InvalidTemporarySchema,
1435 )));
1436 }
1437 let oid = tx.allocate_oid(&temporary_oids)?;
1438
1439 let schema_id = name.qualifiers.schema_spec.clone().into();
1440 let item_type = item.typ();
1441 let (create_sql, global_id, versions) = item.to_serialized();
1442
1443 let item = TemporaryItem {
1444 id,
1445 oid,
1446 global_id,
1447 schema_id,
1448 name: name.item.clone(),
1449 create_sql,
1450 conn_id: item.conn_id().cloned(),
1451 owner_id,
1452 privileges: privileges.clone(),
1453 extra_versions: versions,
1454 };
1455 temporary_item_updates.push((item, StateDiff::Addition));
1456
1457 info!(
1458 "create temporary {} {} ({})",
1459 item_type,
1460 state.resolve_full_name(&name, None),
1461 id
1462 );
1463 } else {
1464 if let Some(temp_id) =
1465 item.uses()
1466 .iter()
1467 .find(|id| match state.try_get_entry(*id) {
1468 Some(entry) => entry.item().is_temporary(),
1469 None => temporary_ids.contains(id),
1470 })
1471 {
1472 let temp_item = state.get_entry(temp_id);
1473 return Err(AdapterError::Catalog(Error::new(
1474 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1475 )));
1476 }
1477 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1478 && !system_user
1479 {
1480 let schema_name = state
1481 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1482 .schema;
1483 return Err(AdapterError::Catalog(Error::new(
1484 ErrorKind::ReadOnlySystemSchema(schema_name),
1485 )));
1486 }
1487 let schema_id = name.qualifiers.schema_spec.clone().into();
1488 let item_type = item.typ();
1489 let (create_sql, global_id, versions) = item.to_serialized();
1490 tx.insert_user_item(
1491 id,
1492 global_id,
1493 schema_id,
1494 &name.item,
1495 create_sql,
1496 owner_id,
1497 privileges.clone(),
1498 &temporary_oids,
1499 versions,
1500 )?;
1501 info!(
1502 "create {} {} ({})",
1503 item_type,
1504 state.resolve_full_name(&name, None),
1505 id
1506 );
1507 }
1508
1509 if Self::should_audit_log_item(&item) {
1510 let name = Self::full_name_detail(
1511 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1512 );
1513 let details = match &item {
1514 CatalogItem::Source(s) => {
1515 let cluster_id = match s.data_source {
1516 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1519 match state.get_entry(&ingestion_id).cluster_id() {
1520 Some(cluster_id) => Some(cluster_id.to_string()),
1521 None => None,
1522 }
1523 }
1524 _ => match item.cluster_id() {
1525 Some(cluster_id) => Some(cluster_id.to_string()),
1526 None => None,
1527 },
1528 };
1529
1530 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1531 id: id.to_string(),
1532 cluster_id,
1533 name,
1534 external_type: s.source_type().to_string(),
1535 })
1536 }
1537 CatalogItem::Sink(s) => {
1538 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1539 id: id.to_string(),
1540 cluster_id: Some(s.cluster_id.to_string()),
1541 name,
1542 external_type: s.sink_type().to_string(),
1543 })
1544 }
1545 CatalogItem::Index(i) => {
1546 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1547 id: id.to_string(),
1548 name,
1549 cluster_id: i.cluster_id.to_string(),
1550 })
1551 }
1552 CatalogItem::MaterializedView(mv) => {
1553 EventDetails::CreateMaterializedViewV1(
1554 mz_audit_log::CreateMaterializedViewV1 {
1555 id: id.to_string(),
1556 name,
1557 cluster_id: mv.cluster_id.to_string(),
1558 replacement_target_id: mv
1559 .replacement_target
1560 .map(|id| id.to_string()),
1561 },
1562 )
1563 }
1564 CatalogItem::Table(_)
1565 | CatalogItem::Log(_)
1566 | CatalogItem::View(_)
1567 | CatalogItem::Type(_)
1568 | CatalogItem::Func(_)
1569 | CatalogItem::Secret(_)
1570 | CatalogItem::Connection(_)
1571 | CatalogItem::ContinualTask(_) => {
1572 EventDetails::IdFullNameV1(IdFullNameV1 {
1573 id: id.to_string(),
1574 name,
1575 })
1576 }
1577 };
1578 CatalogState::add_to_audit_log(
1579 &state.system_configuration,
1580 oracle_write_ts,
1581 session,
1582 tx,
1583 audit_events,
1584 EventType::Create,
1585 catalog_type_to_audit_object_type(item.typ()),
1586 details,
1587 )?;
1588 }
1589 }
1590 Op::CreateNetworkPolicy {
1591 rules,
1592 name,
1593 owner_id,
1594 } => {
1595 if state.network_policies_by_name.contains_key(&name) {
1596 return Err(AdapterError::PlanError(PlanError::Catalog(
1597 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1598 )));
1599 }
1600 if is_reserved_name(&name) {
1601 return Err(AdapterError::Catalog(Error::new(
1602 ErrorKind::ReservedNetworkPolicyName(name),
1603 )));
1604 }
1605
1606 let owner_privileges = vec![rbac::owner_privilege(
1607 mz_sql::catalog::ObjectType::NetworkPolicy,
1608 owner_id,
1609 )];
1610 let default_privileges = state
1611 .default_privileges
1612 .get_applicable_privileges(
1613 owner_id,
1614 None,
1615 None,
1616 mz_sql::catalog::ObjectType::NetworkPolicy,
1617 )
1618 .map(|item| item.mz_acl_item(owner_id));
1619 let privileges: Vec<_> =
1620 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1621 .collect();
1622
1623 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1624 let id = tx.insert_user_network_policy(
1625 name.clone(),
1626 rules,
1627 privileges,
1628 owner_id,
1629 &temporary_oids,
1630 )?;
1631
1632 CatalogState::add_to_audit_log(
1633 &state.system_configuration,
1634 oracle_write_ts,
1635 session,
1636 tx,
1637 audit_events,
1638 EventType::Create,
1639 ObjectType::NetworkPolicy,
1640 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1641 id: id.to_string(),
1642 name: name.clone(),
1643 }),
1644 )?;
1645
1646 info!("created network policy {name} ({id})");
1647 }
1648 Op::Comment {
1649 object_id,
1650 sub_component,
1651 comment,
1652 } => {
1653 tx.update_comment(object_id, sub_component, comment)?;
1654 let entry = state.get_comment_id_entry(&object_id);
1655 let should_log = entry
1656 .map(|entry| Self::should_audit_log_item(entry.item()))
1657 .unwrap_or(true);
1659 if let (Some(conn_id), true) =
1662 (session.map(|session| session.conn_id()), should_log)
1663 {
1664 CatalogState::add_to_audit_log(
1665 &state.system_configuration,
1666 oracle_write_ts,
1667 session,
1668 tx,
1669 audit_events,
1670 EventType::Comment,
1671 comment_id_to_audit_object_type(object_id),
1672 EventDetails::IdNameV1(IdNameV1 {
1673 id: format!("{object_id:?}"),
1675 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1676 }),
1677 )?;
1678 }
1679 }
1680 Op::UpdateSourceReferences {
1681 source_id,
1682 references,
1683 } => {
1684 tx.update_source_references(
1685 source_id,
1686 references
1687 .references
1688 .into_iter()
1689 .map(|reference| reference.into())
1690 .collect(),
1691 references.updated_at,
1692 )?;
1693 }
1694 Op::DropObjects(drop_object_infos) => {
1695 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1697
1698 tx.drop_comments(&delta.comments)?;
1700
1701 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1703 delta
1704 .items
1705 .iter()
1706 .map(|id| id)
1707 .partition(|id| !state.get_entry(*id).item().is_temporary());
1708 tx.remove_items(&durable_items_to_drop)?;
1709 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1710 let entry = state.get_entry(&id);
1711 (entry.clone().into(), StateDiff::Retraction)
1712 }));
1713
1714 for item_id in delta.items {
1715 let entry = state.get_entry(&item_id);
1716
1717 if entry.item().is_storage_collection() {
1718 storage_collections_to_drop.extend(entry.global_ids());
1719 }
1720
1721 if state.source_references.contains_key(&item_id) {
1722 tx.remove_source_references(item_id)?;
1723 }
1724
1725 if Self::should_audit_log_item(entry.item()) {
1726 CatalogState::add_to_audit_log(
1727 &state.system_configuration,
1728 oracle_write_ts,
1729 session,
1730 tx,
1731 audit_events,
1732 EventType::Drop,
1733 catalog_type_to_audit_object_type(entry.item().typ()),
1734 EventDetails::IdFullNameV1(IdFullNameV1 {
1735 id: item_id.to_string(),
1736 name: Self::full_name_detail(&state.resolve_full_name(
1737 entry.name(),
1738 session.map(|session| session.conn_id()),
1739 )),
1740 }),
1741 )?;
1742 }
1743 info!(
1744 "drop {} {} ({})",
1745 entry.item_type(),
1746 state.resolve_full_name(entry.name(), entry.conn_id()),
1747 item_id
1748 );
1749 }
1750
1751 let schemas = delta
1753 .schemas
1754 .iter()
1755 .map(|(schema_spec, database_spec)| {
1756 (SchemaId::from(schema_spec), *database_spec)
1757 })
1758 .collect();
1759 tx.remove_schemas(&schemas)?;
1760
1761 for (schema_spec, database_spec) in delta.schemas {
1762 let schema = state.get_schema(
1763 &database_spec,
1764 &schema_spec,
1765 session
1766 .map(|session| session.conn_id())
1767 .unwrap_or(&SYSTEM_CONN_ID),
1768 );
1769
1770 let schema_id = SchemaId::from(schema_spec);
1771 let database_id = match database_spec {
1772 ResolvedDatabaseSpecifier::Ambient => None,
1773 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1774 };
1775
1776 CatalogState::add_to_audit_log(
1777 &state.system_configuration,
1778 oracle_write_ts,
1779 session,
1780 tx,
1781 audit_events,
1782 EventType::Drop,
1783 ObjectType::Schema,
1784 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1785 id: schema_id.to_string(),
1786 name: schema.name.schema.to_string(),
1787 database_name: database_id
1788 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1789 }),
1790 )?;
1791 }
1792
1793 tx.remove_databases(&delta.databases)?;
1795
1796 for database_id in delta.databases {
1797 let database = state.get_database(&database_id).clone();
1798
1799 CatalogState::add_to_audit_log(
1800 &state.system_configuration,
1801 oracle_write_ts,
1802 session,
1803 tx,
1804 audit_events,
1805 EventType::Drop,
1806 ObjectType::Database,
1807 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1808 id: database_id.to_string(),
1809 name: database.name.clone(),
1810 }),
1811 )?;
1812 }
1813
1814 tx.remove_user_roles(&delta.roles)?;
1816
1817 for role_id in delta.roles {
1818 let role = state
1819 .roles_by_id
1820 .get(&role_id)
1821 .expect("catalog out of sync");
1822
1823 CatalogState::add_to_audit_log(
1824 &state.system_configuration,
1825 oracle_write_ts,
1826 session,
1827 tx,
1828 audit_events,
1829 EventType::Drop,
1830 ObjectType::Role,
1831 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1832 id: role.id.to_string(),
1833 name: role.name.clone(),
1834 }),
1835 )?;
1836 info!("drop role {}", role.name());
1837 }
1838
1839 tx.remove_network_policies(&delta.network_policies)?;
1841
1842 for network_policy_id in delta.network_policies {
1843 let policy = state
1844 .network_policies_by_id
1845 .get(&network_policy_id)
1846 .expect("catalog out of sync");
1847
1848 CatalogState::add_to_audit_log(
1849 &state.system_configuration,
1850 oracle_write_ts,
1851 session,
1852 tx,
1853 audit_events,
1854 EventType::Drop,
1855 ObjectType::NetworkPolicy,
1856 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1857 id: policy.id.to_string(),
1858 name: policy.name.clone(),
1859 }),
1860 )?;
1861 info!("drop network policy {}", policy.name.clone());
1862 }
1863
1864 let replicas = delta.replicas.keys().copied().collect();
1866 tx.remove_cluster_replicas(&replicas)?;
1867
1868 for (replica_id, (cluster_id, reason)) in delta.replicas {
1869 let cluster = state.get_cluster(cluster_id);
1870 let replica = cluster.replica(replica_id).expect("Must exist");
1871
1872 let (reason, scheduling_policies) = reason.into_audit_log();
1873 let details =
1874 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1875 cluster_id: cluster_id.to_string(),
1876 cluster_name: cluster.name.clone(),
1877 replica_id: Some(replica_id.to_string()),
1878 replica_name: replica.name.clone(),
1879 reason,
1880 scheduling_policies,
1881 });
1882 CatalogState::add_to_audit_log(
1883 &state.system_configuration,
1884 oracle_write_ts,
1885 session,
1886 tx,
1887 audit_events,
1888 EventType::Drop,
1889 ObjectType::ClusterReplica,
1890 details,
1891 )?;
1892 }
1893
1894 tx.remove_clusters(&delta.clusters)?;
1896
1897 for cluster_id in delta.clusters {
1898 let cluster = state.get_cluster(cluster_id);
1899
1900 CatalogState::add_to_audit_log(
1901 &state.system_configuration,
1902 oracle_write_ts,
1903 session,
1904 tx,
1905 audit_events,
1906 EventType::Drop,
1907 ObjectType::Cluster,
1908 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1909 id: cluster.id.to_string(),
1910 name: cluster.name.clone(),
1911 }),
1912 )?;
1913 }
1914 }
1915 Op::GrantRole {
1916 role_id,
1917 member_id,
1918 grantor_id,
1919 } => {
1920 state.ensure_not_reserved_role(&member_id)?;
1921 state.ensure_grantable_role(&role_id)?;
1922 if state.collect_role_membership(&role_id).contains(&member_id) {
1923 let group_role = state.get_role(&role_id);
1924 let member_role = state.get_role(&member_id);
1925 return Err(AdapterError::Catalog(Error::new(
1926 ErrorKind::CircularRoleMembership {
1927 role_name: group_role.name().to_string(),
1928 member_name: member_role.name().to_string(),
1929 },
1930 )));
1931 }
1932 let mut member_role = state.get_role(&member_id).clone();
1933 member_role.membership.map.insert(role_id, grantor_id);
1934 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1935
1936 CatalogState::add_to_audit_log(
1937 &state.system_configuration,
1938 oracle_write_ts,
1939 session,
1940 tx,
1941 audit_events,
1942 EventType::Grant,
1943 ObjectType::Role,
1944 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1945 role_id: role_id.to_string(),
1946 member_id: member_id.to_string(),
1947 grantor_id: grantor_id.to_string(),
1948 executed_by: session
1949 .map(|session| session.authenticated_role_id())
1950 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1951 .to_string(),
1952 }),
1953 )?;
1954 }
1955 Op::RevokeRole {
1956 role_id,
1957 member_id,
1958 grantor_id,
1959 } => {
1960 state.ensure_not_reserved_role(&member_id)?;
1961 state.ensure_grantable_role(&role_id)?;
1962 let mut member_role = state.get_role(&member_id).clone();
1963 member_role.membership.map.remove(&role_id);
1964 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1965
1966 CatalogState::add_to_audit_log(
1967 &state.system_configuration,
1968 oracle_write_ts,
1969 session,
1970 tx,
1971 audit_events,
1972 EventType::Revoke,
1973 ObjectType::Role,
1974 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1975 role_id: role_id.to_string(),
1976 member_id: member_id.to_string(),
1977 grantor_id: grantor_id.to_string(),
1978 executed_by: session
1979 .map(|session| session.authenticated_role_id())
1980 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1981 .to_string(),
1982 }),
1983 )?;
1984 }
1985 Op::UpdatePrivilege {
1986 target_id,
1987 privilege,
1988 variant,
1989 } => {
1990 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1991 UpdatePrivilegeVariant::Grant => {
1992 privileges.grant(privilege);
1993 }
1994 UpdatePrivilegeVariant::Revoke => {
1995 privileges.revoke(&privilege);
1996 }
1997 };
1998 match &target_id {
1999 SystemObjectId::Object(object_id) => match object_id {
2000 ObjectId::Cluster(id) => {
2001 let mut cluster = state.get_cluster(*id).clone();
2002 update_privilege_fn(&mut cluster.privileges);
2003 tx.update_cluster(*id, cluster.into())?;
2004 }
2005 ObjectId::Database(id) => {
2006 let mut database = state.get_database(id).clone();
2007 update_privilege_fn(&mut database.privileges);
2008 tx.update_database(*id, database.into())?;
2009 }
2010 ObjectId::NetworkPolicy(id) => {
2011 let mut policy = state.get_network_policy(id).clone();
2012 update_privilege_fn(&mut policy.privileges);
2013 tx.update_network_policy(*id, policy.into())?;
2014 }
2015 ObjectId::Schema((database_spec, schema_spec)) => {
2016 let schema_id = schema_spec.clone().into();
2017 let mut schema = state
2018 .get_schema(
2019 database_spec,
2020 schema_spec,
2021 session
2022 .map(|session| session.conn_id())
2023 .unwrap_or(&SYSTEM_CONN_ID),
2024 )
2025 .clone();
2026 update_privilege_fn(&mut schema.privileges);
2027 tx.update_schema(schema_id, schema.into())?;
2028 }
2029 ObjectId::Item(id) => {
2030 let entry = state.get_entry(id);
2031 let mut new_entry = entry.clone();
2032 update_privilege_fn(&mut new_entry.privileges);
2033 if !new_entry.item().is_temporary() {
2034 tx.update_item(*id, new_entry.into())?;
2035 } else {
2036 temporary_item_updates
2037 .push((entry.clone().into(), StateDiff::Retraction));
2038 temporary_item_updates
2039 .push((new_entry.into(), StateDiff::Addition));
2040 }
2041 }
2042 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2043 },
2044 SystemObjectId::System => {
2045 let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2046 update_privilege_fn(&mut system_privileges);
2047 let new_privilege =
2048 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2049 tx.set_system_privilege(
2050 privilege.grantee,
2051 privilege.grantor,
2052 new_privilege.map(|new_privilege| new_privilege.acl_mode),
2053 )?;
2054 }
2055 }
2056 let object_type = state.get_system_object_type(&target_id);
2057 let object_id_str = match &target_id {
2058 SystemObjectId::System => "SYSTEM".to_string(),
2059 SystemObjectId::Object(id) => id.to_string(),
2060 };
2061 CatalogState::add_to_audit_log(
2062 &state.system_configuration,
2063 oracle_write_ts,
2064 session,
2065 tx,
2066 audit_events,
2067 variant.into(),
2068 system_object_type_to_audit_object_type(&object_type),
2069 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2070 object_id: object_id_str,
2071 grantee_id: privilege.grantee.to_string(),
2072 grantor_id: privilege.grantor.to_string(),
2073 privileges: privilege.acl_mode.to_string(),
2074 }),
2075 )?;
2076 }
2077 Op::UpdateDefaultPrivilege {
2078 privilege_object,
2079 privilege_acl_item,
2080 variant,
2081 } => {
2082 let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2083 match variant {
2084 UpdatePrivilegeVariant::Grant => default_privileges
2085 .grant(privilege_object.clone(), privilege_acl_item.clone()),
2086 UpdatePrivilegeVariant::Revoke => {
2087 default_privileges.revoke(&privilege_object, &privilege_acl_item)
2088 }
2089 }
2090 let new_acl_mode = default_privileges
2091 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2092 tx.set_default_privilege(
2093 privilege_object.role_id,
2094 privilege_object.database_id,
2095 privilege_object.schema_id,
2096 privilege_object.object_type,
2097 privilege_acl_item.grantee,
2098 new_acl_mode.cloned(),
2099 )?;
2100 CatalogState::add_to_audit_log(
2101 &state.system_configuration,
2102 oracle_write_ts,
2103 session,
2104 tx,
2105 audit_events,
2106 variant.into(),
2107 object_type_to_audit_object_type(privilege_object.object_type),
2108 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2109 role_id: privilege_object.role_id.to_string(),
2110 database_id: privilege_object.database_id.map(|id| id.to_string()),
2111 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2112 grantee_id: privilege_acl_item.grantee.to_string(),
2113 privileges: privilege_acl_item.acl_mode.to_string(),
2114 }),
2115 )?;
2116 }
2117 Op::RenameCluster {
2118 id,
2119 name,
2120 to_name,
2121 check_reserved_names,
2122 } => {
2123 if id.is_system() {
2124 return Err(AdapterError::Catalog(Error::new(
2125 ErrorKind::ReadOnlyCluster(name.clone()),
2126 )));
2127 }
2128 if check_reserved_names && is_reserved_name(&to_name) {
2129 return Err(AdapterError::Catalog(Error::new(
2130 ErrorKind::ReservedClusterName(to_name),
2131 )));
2132 }
2133 tx.rename_cluster(id, &name, &to_name)?;
2134 CatalogState::add_to_audit_log(
2135 &state.system_configuration,
2136 oracle_write_ts,
2137 session,
2138 tx,
2139 audit_events,
2140 EventType::Alter,
2141 ObjectType::Cluster,
2142 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2143 id: id.to_string(),
2144 old_name: name.clone(),
2145 new_name: to_name.clone(),
2146 }),
2147 )?;
2148 info!("rename cluster {name} to {to_name}");
2149 }
2150 Op::RenameClusterReplica {
2151 cluster_id,
2152 replica_id,
2153 name,
2154 to_name,
2155 } => {
2156 if is_reserved_name(&to_name) {
2157 return Err(AdapterError::Catalog(Error::new(
2158 ErrorKind::ReservedReplicaName(to_name),
2159 )));
2160 }
2161 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2162 CatalogState::add_to_audit_log(
2163 &state.system_configuration,
2164 oracle_write_ts,
2165 session,
2166 tx,
2167 audit_events,
2168 EventType::Alter,
2169 ObjectType::ClusterReplica,
2170 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2171 cluster_id: cluster_id.to_string(),
2172 replica_id: replica_id.to_string(),
2173 old_name: name.replica.as_str().to_string(),
2174 new_name: to_name.clone(),
2175 }),
2176 )?;
2177 info!("rename cluster replica {name} to {to_name}");
2178 }
2179 Op::RenameItem {
2180 id,
2181 to_name,
2182 current_full_name,
2183 } => {
2184 let mut updates = Vec::new();
2185
2186 let entry = state.get_entry(&id);
2187 if let CatalogItem::Type(_) = entry.item() {
2188 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2189 current_full_name.to_string(),
2190 ))));
2191 }
2192
2193 if entry.id().is_system() {
2194 let name = state
2195 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2196 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2197 name.to_string(),
2198 ))));
2199 }
2200
2201 let mut to_full_name = current_full_name.clone();
2202 to_full_name.item.clone_from(&to_name);
2203
2204 let mut to_qualified_name = entry.name().clone();
2205 to_qualified_name.item.clone_from(&to_name);
2206
2207 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2208 id: id.to_string(),
2209 old_name: Self::full_name_detail(¤t_full_name),
2210 new_name: Self::full_name_detail(&to_full_name),
2211 });
2212 if Self::should_audit_log_item(entry.item()) {
2213 CatalogState::add_to_audit_log(
2214 &state.system_configuration,
2215 oracle_write_ts,
2216 session,
2217 tx,
2218 audit_events,
2219 EventType::Alter,
2220 catalog_type_to_audit_object_type(entry.item().typ()),
2221 details,
2222 )?;
2223 }
2224
2225 let mut new_entry = entry.clone();
2227 new_entry.name.item.clone_from(&to_name);
2228 new_entry.item = entry
2229 .item()
2230 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2231 .map_err(|e| {
2232 Error::new(ErrorKind::from(AmbiguousRename {
2233 depender: state
2234 .resolve_full_name(entry.name(), entry.conn_id())
2235 .to_string(),
2236 dependee: state
2237 .resolve_full_name(entry.name(), entry.conn_id())
2238 .to_string(),
2239 message: e,
2240 }))
2241 })?;
2242
2243 for id in entry.referenced_by() {
2244 let dependent_item = state.get_entry(id);
2245 let mut to_entry = dependent_item.clone();
2246 to_entry.item = dependent_item
2247 .item()
2248 .rename_item_refs(
2249 current_full_name.clone(),
2250 to_full_name.item.clone(),
2251 false,
2252 )
2253 .map_err(|e| {
2254 Error::new(ErrorKind::from(AmbiguousRename {
2255 depender: state
2256 .resolve_full_name(
2257 dependent_item.name(),
2258 dependent_item.conn_id(),
2259 )
2260 .to_string(),
2261 dependee: state
2262 .resolve_full_name(entry.name(), entry.conn_id())
2263 .to_string(),
2264 message: e,
2265 }))
2266 })?;
2267
2268 if !to_entry.item().is_temporary() {
2269 tx.update_item(*id, to_entry.into())?;
2270 } else {
2271 temporary_item_updates
2272 .push((dependent_item.clone().into(), StateDiff::Retraction));
2273 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2274 }
2275 updates.push(*id);
2276 }
2277 if !new_entry.item().is_temporary() {
2278 tx.update_item(id, new_entry.into())?;
2279 } else {
2280 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2281 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2282 }
2283
2284 updates.push(id);
2285 for id in updates {
2286 Self::log_update(state, &id);
2287 }
2288 }
2289 Op::RenameSchema {
2290 database_spec,
2291 schema_spec,
2292 new_name,
2293 check_reserved_names,
2294 } => {
2295 if check_reserved_names && is_reserved_name(&new_name) {
2296 return Err(AdapterError::Catalog(Error::new(
2297 ErrorKind::ReservedSchemaName(new_name),
2298 )));
2299 }
2300
2301 let conn_id = session
2302 .map(|session| session.conn_id())
2303 .unwrap_or(&SYSTEM_CONN_ID);
2304
2305 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2306 let cur_name = schema.name().schema.clone();
2307
2308 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2309 return Err(AdapterError::Catalog(Error::new(
2310 ErrorKind::AmbientSchemaRename(cur_name),
2311 )));
2312 };
2313 let database = state.get_database(&database_id);
2314 let database_name = &database.name;
2315
2316 let mut updates = Vec::new();
2317 let mut items_to_update = BTreeMap::new();
2318
2319 let mut update_item = |id| {
2320 if items_to_update.contains_key(id) {
2321 return Ok(());
2322 }
2323
2324 let entry = state.get_entry(id);
2325
2326 let mut new_entry = entry.clone();
2328 new_entry.item = entry
2329 .item
2330 .rename_schema_refs(database_name, &cur_name, &new_name)
2331 .map_err(|(s, _i)| {
2332 Error::new(ErrorKind::from(AmbiguousRename {
2333 depender: state
2334 .resolve_full_name(entry.name(), entry.conn_id())
2335 .to_string(),
2336 dependee: format!("{database_name}.{cur_name}"),
2337 message: format!("ambiguous reference to schema named {s}"),
2338 }))
2339 })?;
2340
2341 if !new_entry.item().is_temporary() {
2343 items_to_update.insert(*id, new_entry.into());
2344 } else {
2345 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2346 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2347 }
2348 updates.push(id);
2349
2350 Ok::<_, AdapterError>(())
2351 };
2352
2353 for (_name, item_id) in &schema.items {
2355 update_item(item_id)?;
2357
2358 for id in state.get_entry(item_id).referenced_by() {
2360 update_item(id)?;
2361 }
2362 }
2363 tx.update_items(items_to_update)?;
2366
2367 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2369 let schema_name = schema.name().schema.clone();
2370 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2371 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2372 )));
2373 };
2374
2375 let database_name = database_spec
2377 .id()
2378 .map(|id| state.get_database(&id).name.clone());
2379 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2380 id: schema_id.to_string(),
2381 old_name: schema.name().schema.clone(),
2382 new_name: new_name.clone(),
2383 database_name,
2384 });
2385 CatalogState::add_to_audit_log(
2386 &state.system_configuration,
2387 oracle_write_ts,
2388 session,
2389 tx,
2390 audit_events,
2391 EventType::Alter,
2392 mz_audit_log::ObjectType::Schema,
2393 details,
2394 )?;
2395
2396 let mut new_schema = schema.clone();
2398 new_schema.name.schema.clone_from(&new_name);
2399 tx.update_schema(schema_id, new_schema.into())?;
2400
2401 for id in updates {
2402 Self::log_update(state, id);
2403 }
2404 }
2405 Op::UpdateOwner { id, new_owner } => {
2406 let conn_id = session
2407 .map(|session| session.conn_id())
2408 .unwrap_or(&SYSTEM_CONN_ID);
2409 let old_owner = state
2410 .get_owner_id(&id, conn_id)
2411 .expect("cannot update the owner of an object without an owner");
2412 match &id {
2413 ObjectId::Cluster(id) => {
2414 let mut cluster = state.get_cluster(*id).clone();
2415 if id.is_system() {
2416 return Err(AdapterError::Catalog(Error::new(
2417 ErrorKind::ReadOnlyCluster(cluster.name),
2418 )));
2419 }
2420 Self::update_privilege_owners(
2421 &mut cluster.privileges,
2422 cluster.owner_id,
2423 new_owner,
2424 );
2425 cluster.owner_id = new_owner;
2426 tx.update_cluster(*id, cluster.into())?;
2427 }
2428 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2429 let cluster = state.get_cluster(*cluster_id);
2430 let mut replica = cluster
2431 .replica(*replica_id)
2432 .expect("catalog out of sync")
2433 .clone();
2434 if replica_id.is_system() {
2435 return Err(AdapterError::Catalog(Error::new(
2436 ErrorKind::ReadOnlyClusterReplica(replica.name),
2437 )));
2438 }
2439 replica.owner_id = new_owner;
2440 tx.update_cluster_replica(*replica_id, replica.into())?;
2441 }
2442 ObjectId::Database(id) => {
2443 let mut database = state.get_database(id).clone();
2444 if id.is_system() {
2445 return Err(AdapterError::Catalog(Error::new(
2446 ErrorKind::ReadOnlyDatabase(database.name),
2447 )));
2448 }
2449 Self::update_privilege_owners(
2450 &mut database.privileges,
2451 database.owner_id,
2452 new_owner,
2453 );
2454 database.owner_id = new_owner;
2455 tx.update_database(*id, database.clone().into())?;
2456 }
2457 ObjectId::Schema((database_spec, schema_spec)) => {
2458 let schema_id: SchemaId = schema_spec.clone().into();
2459 let mut schema = state
2460 .get_schema(database_spec, schema_spec, conn_id)
2461 .clone();
2462 if schema_id.is_system() {
2463 let name = schema.name();
2464 let full_name = state.resolve_full_schema_name(name);
2465 return Err(AdapterError::Catalog(Error::new(
2466 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2467 )));
2468 }
2469 Self::update_privilege_owners(
2470 &mut schema.privileges,
2471 schema.owner_id,
2472 new_owner,
2473 );
2474 schema.owner_id = new_owner;
2475 tx.update_schema(schema_id, schema.into())?;
2476 }
2477 ObjectId::Item(id) => {
2478 let entry = state.get_entry(id);
2479 let mut new_entry = entry.clone();
2480 if id.is_system() {
2481 let full_name = state.resolve_full_name(
2482 new_entry.name(),
2483 session.map(|session| session.conn_id()),
2484 );
2485 return Err(AdapterError::Catalog(Error::new(
2486 ErrorKind::ReadOnlyItem(full_name.to_string()),
2487 )));
2488 }
2489 Self::update_privilege_owners(
2490 &mut new_entry.privileges,
2491 new_entry.owner_id,
2492 new_owner,
2493 );
2494 new_entry.owner_id = new_owner;
2495 if !new_entry.item().is_temporary() {
2496 tx.update_item(*id, new_entry.into())?;
2497 } else {
2498 temporary_item_updates
2499 .push((entry.clone().into(), StateDiff::Retraction));
2500 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2501 }
2502 }
2503 ObjectId::NetworkPolicy(id) => {
2504 let mut policy = state.get_network_policy(id).clone();
2505 if id.is_system() {
2506 return Err(AdapterError::Catalog(Error::new(
2507 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2508 )));
2509 }
2510 Self::update_privilege_owners(
2511 &mut policy.privileges,
2512 policy.owner_id,
2513 new_owner,
2514 );
2515 policy.owner_id = new_owner;
2516 tx.update_network_policy(*id, policy.into())?;
2517 }
2518 ObjectId::Role(_) => unreachable!("roles have no owner"),
2519 }
2520 let object_type = state.get_object_type(&id);
2521 CatalogState::add_to_audit_log(
2522 &state.system_configuration,
2523 oracle_write_ts,
2524 session,
2525 tx,
2526 audit_events,
2527 EventType::Alter,
2528 object_type_to_audit_object_type(object_type),
2529 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2530 object_id: id.to_string(),
2531 old_owner_id: old_owner.to_string(),
2532 new_owner_id: new_owner.to_string(),
2533 }),
2534 )?;
2535 }
2536 Op::UpdateClusterConfig { id, name, config } => {
2537 let mut cluster = state.get_cluster(id).clone();
2538 cluster.config = config;
2539 tx.update_cluster(id, cluster.into())?;
2540 info!("update cluster {}", name);
2541
2542 CatalogState::add_to_audit_log(
2543 &state.system_configuration,
2544 oracle_write_ts,
2545 session,
2546 tx,
2547 audit_events,
2548 EventType::Alter,
2549 ObjectType::Cluster,
2550 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2551 id: id.to_string(),
2552 name,
2553 }),
2554 )?;
2555 }
2556 Op::UpdateClusterReplicaConfig {
2557 replica_id,
2558 cluster_id,
2559 config,
2560 } => {
2561 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2562 info!("update replica {}", replica.name);
2563 tx.update_cluster_replica(
2564 replica_id,
2565 mz_catalog::durable::ClusterReplica {
2566 cluster_id,
2567 replica_id,
2568 name: replica.name.clone(),
2569 config: config.clone().into(),
2570 owner_id: replica.owner_id,
2571 },
2572 )?;
2573 }
2574 Op::UpdateItem { id, name, to_item } => {
2575 let mut entry = state.get_entry(&id).clone();
2576 entry.name = name.clone();
2577 entry.item = to_item.clone();
2578 tx.update_item(id, entry.into())?;
2579
2580 if Self::should_audit_log_item(&to_item) {
2581 let mut full_name = Self::full_name_detail(
2582 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2583 );
2584 full_name.item = name.item;
2585
2586 CatalogState::add_to_audit_log(
2587 &state.system_configuration,
2588 oracle_write_ts,
2589 session,
2590 tx,
2591 audit_events,
2592 EventType::Alter,
2593 catalog_type_to_audit_object_type(to_item.typ()),
2594 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2595 id: id.to_string(),
2596 name: full_name,
2597 }),
2598 )?;
2599 }
2600
2601 Self::log_update(state, &id);
2602 }
2603 Op::UpdateSystemConfiguration { name, value } => {
2604 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2605 tx.upsert_system_config(&name, parsed_value.clone())?;
2606 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2611 let with_0dt_deployment_max_wait =
2612 Duration::parse(VarInput::Flat(&parsed_value))
2613 .expect("parsing succeeded above");
2614 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2615 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2616 let with_0dt_deployment_ddl_check_interval =
2617 Duration::parse(VarInput::Flat(&parsed_value))
2618 .expect("parsing succeeded above");
2619 tx.set_0dt_deployment_ddl_check_interval(
2620 with_0dt_deployment_ddl_check_interval,
2621 )?;
2622 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2623 let panic_after_timeout =
2624 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2625 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2626 }
2627
2628 CatalogState::add_to_audit_log(
2629 &state.system_configuration,
2630 oracle_write_ts,
2631 session,
2632 tx,
2633 audit_events,
2634 EventType::Alter,
2635 ObjectType::System,
2636 EventDetails::SetV1(mz_audit_log::SetV1 {
2637 name,
2638 value: Some(value.borrow().to_vec().join(", ")),
2639 }),
2640 )?;
2641 }
2642 Op::ResetSystemConfiguration { name } => {
2643 tx.remove_system_config(&name);
2644 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2649 tx.reset_0dt_deployment_max_wait()?;
2650 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2651 tx.reset_0dt_deployment_ddl_check_interval()?;
2652 }
2653
2654 CatalogState::add_to_audit_log(
2655 &state.system_configuration,
2656 oracle_write_ts,
2657 session,
2658 tx,
2659 audit_events,
2660 EventType::Alter,
2661 ObjectType::System,
2662 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2663 )?;
2664 }
2665 Op::ResetAllSystemConfiguration => {
2666 tx.clear_system_configs();
2667 tx.reset_0dt_deployment_max_wait()?;
2668 tx.reset_0dt_deployment_ddl_check_interval()?;
2669
2670 CatalogState::add_to_audit_log(
2671 &state.system_configuration,
2672 oracle_write_ts,
2673 session,
2674 tx,
2675 audit_events,
2676 EventType::Alter,
2677 ObjectType::System,
2678 EventDetails::ResetAllV1,
2679 )?;
2680 }
2681 Op::WeirdStorageUsageUpdates {
2682 object_id,
2683 size_bytes,
2684 collection_timestamp,
2685 } => {
2686 let id = tx.allocate_storage_usage_ids()?;
2687 let metric =
2688 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2689 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2690 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2691 weird_builtin_table_update = Some(builtin_table_update);
2692 }
2693 Op::InjectAuditEvents { events } => {
2694 for event in events {
2695 let id = tx.allocate_audit_log_id()?;
2696 let ev = VersionedEvent::new(
2697 id,
2698 event.event_type,
2699 event.object_type,
2700 event.details,
2701 event.user,
2702 oracle_write_ts.into(),
2703 );
2704 audit_events.push(ev.clone());
2705 tx.insert_audit_log_event(ev);
2706 }
2707 }
2708 };
2709 Ok((weird_builtin_table_update, temporary_item_updates))
2710 }
2711
2712 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2713 let entry = state.get_entry(id);
2714 info!(
2715 "update {} {} ({})",
2716 entry.item_type(),
2717 state.resolve_full_name(entry.name(), entry.conn_id()),
2718 id
2719 );
2720 }
2721
2722 fn update_privilege_owners(
2726 privileges: &mut PrivilegeMap,
2727 old_owner: RoleId,
2728 new_owner: RoleId,
2729 ) {
2730 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2732
2733 let mut new_present = false;
2734 for privilege in flat_privileges.iter_mut() {
2735 if privilege.grantor == old_owner {
2738 privilege.grantor = new_owner;
2739 } else if privilege.grantor == new_owner {
2740 new_present = true;
2741 }
2742 if privilege.grantee == old_owner {
2744 privilege.grantee = new_owner;
2745 } else if privilege.grantee == new_owner {
2746 new_present = true;
2747 }
2748 }
2749
2750 if new_present {
2754 let privilege_map: BTreeMap<_, Vec<_>> =
2756 flat_privileges
2757 .into_iter()
2758 .fold(BTreeMap::new(), |mut accum, privilege| {
2759 accum
2760 .entry((privilege.grantee, privilege.grantor))
2761 .or_default()
2762 .push(privilege);
2763 accum
2764 });
2765
2766 flat_privileges = privilege_map
2768 .into_iter()
2769 .map(|((grantee, grantor), values)|
2770 values.into_iter().fold(
2772 MzAclItem::empty(grantee, grantor),
2773 |mut accum, mz_aclitem| {
2774 accum.acl_mode =
2775 accum.acl_mode.union(mz_aclitem.acl_mode);
2776 accum
2777 },
2778 ))
2779 .collect();
2780 }
2781
2782 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2783 }
2784}
2785
2786fn tx_replace_item(
2795 tx: &mut Transaction<'_>,
2796 state: &CatalogState,
2797 id: CatalogItemId,
2798 new_entry: CatalogEntry,
2799) -> Result<(), AdapterError> {
2800 let new_id = new_entry.id;
2801
2802 for use_id in new_entry.referenced_by() {
2804 if tx.get_item(use_id).is_none() {
2806 continue;
2807 }
2808
2809 let mut dependent = state.get_entry(use_id).clone();
2810 dependent.item = dependent.item.replace_item_refs(id, new_id);
2811 tx.update_item(*use_id, dependent.into())?;
2812 }
2813
2814 let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2816 let new_comment_id = new_entry.comment_object_id();
2817 if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2818 tx.drop_comments(&[old_comment_id].into())?;
2819 for (sub, comment) in comments {
2820 tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2821 }
2822 }
2823
2824 let mz_catalog::durable::Item {
2825 id: _,
2826 oid,
2827 global_id,
2828 schema_id,
2829 name,
2830 create_sql,
2831 owner_id,
2832 privileges,
2833 extra_versions,
2834 } = new_entry.into();
2835
2836 tx.remove_item(id)?;
2837 tx.insert_item(
2838 new_id,
2839 oid,
2840 global_id,
2841 schema_id,
2842 &name,
2843 create_sql,
2844 owner_id,
2845 privileges,
2846 extra_versions,
2847 )?;
2848
2849 Ok(())
2850}
2851
2852fn apply_replacement_audit_events(
2854 state: &CatalogState,
2855 target: &CatalogEntry,
2856 replacement: &CatalogEntry,
2857) -> Vec<(EventType, EventDetails)> {
2858 let mut events = Vec::new();
2859
2860 let target_name = state.resolve_full_name(target.name(), target.conn_id());
2861 let target_id_name = IdFullNameV1 {
2862 id: target.id().to_string(),
2863 name: Catalog::full_name_detail(&target_name),
2864 };
2865 let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2866 let replacement_id_name = IdFullNameV1 {
2867 id: replacement.id().to_string(),
2868 name: Catalog::full_name_detail(&replacement_name),
2869 };
2870
2871 if Catalog::should_audit_log_item(&replacement.item) {
2872 events.push((
2873 EventType::Drop,
2874 EventDetails::IdFullNameV1(replacement_id_name.clone()),
2875 ));
2876 }
2877
2878 if Catalog::should_audit_log_item(&target.item) {
2879 events.push((
2880 EventType::Alter,
2881 EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2882 target: target_id_name.clone(),
2883 replacement: replacement_id_name,
2884 }),
2885 ));
2886
2887 if let Some(old_cluster_id) = target.cluster_id()
2888 && let Some(new_cluster_id) = replacement.cluster_id()
2889 && old_cluster_id != new_cluster_id
2890 {
2891 events.push((
2894 EventType::Alter,
2895 EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2896 id: replacement.id().to_string(),
2897 name: target_id_name.name,
2898 old_cluster_id: old_cluster_id.to_string(),
2899 new_cluster_id: new_cluster_id.to_string(),
2900 }),
2901 ));
2902 }
2903 }
2904
2905 events
2906}
2907
2908#[derive(Debug, Default)]
2916pub(crate) struct ObjectsToDrop {
2917 pub comments: BTreeSet<CommentObjectId>,
2918 pub databases: BTreeSet<DatabaseId>,
2919 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2920 pub clusters: BTreeSet<ClusterId>,
2921 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2922 pub roles: BTreeSet<RoleId>,
2923 pub items: Vec<CatalogItemId>,
2924 pub network_policies: BTreeSet<NetworkPolicyId>,
2925}
2926
2927impl ObjectsToDrop {
2928 pub fn generate(
2929 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2930 state: &CatalogState,
2931 session: Option<&ConnMeta>,
2932 ) -> Result<Self, AdapterError> {
2933 let mut delta = ObjectsToDrop::default();
2934
2935 for drop_object_info in drop_object_infos {
2936 delta.add_item(drop_object_info, state, session)?;
2937 }
2938
2939 Ok(delta)
2940 }
2941
2942 fn add_item(
2943 &mut self,
2944 drop_object_info: DropObjectInfo,
2945 state: &CatalogState,
2946 session: Option<&ConnMeta>,
2947 ) -> Result<(), AdapterError> {
2948 self.comments
2949 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2950
2951 match drop_object_info {
2952 DropObjectInfo::Database(database_id) => {
2953 let database = &state.database_by_id[&database_id];
2954 if database_id.is_system() {
2955 return Err(AdapterError::Catalog(Error::new(
2956 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2957 )));
2958 }
2959
2960 self.databases.insert(database_id);
2961 }
2962 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2963 let schema = state.get_schema(
2964 &database_spec,
2965 &schema_spec,
2966 session
2967 .map(|session| session.conn_id())
2968 .unwrap_or(&SYSTEM_CONN_ID),
2969 );
2970 let schema_id: SchemaId = schema_spec.into();
2971 if schema_id.is_system() {
2972 let name = schema.name();
2973 let full_name = state.resolve_full_schema_name(name);
2974 return Err(AdapterError::Catalog(Error::new(
2975 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2976 )));
2977 }
2978
2979 self.schemas.insert(schema_spec, database_spec);
2980 }
2981 DropObjectInfo::Role(role_id) => {
2982 let name = state.get_role(&role_id).name().to_string();
2983 if role_id.is_system() || role_id.is_predefined() {
2984 return Err(AdapterError::Catalog(Error::new(
2985 ErrorKind::ReservedRoleName(name.clone()),
2986 )));
2987 }
2988 state.ensure_not_reserved_role(&role_id)?;
2989
2990 self.roles.insert(role_id);
2991 }
2992 DropObjectInfo::Cluster(cluster_id) => {
2993 let cluster = state.get_cluster(cluster_id);
2994 let name = &cluster.name;
2995 if cluster_id.is_system() {
2996 return Err(AdapterError::Catalog(Error::new(
2997 ErrorKind::ReadOnlyCluster(name.clone()),
2998 )));
2999 }
3000
3001 self.clusters.insert(cluster_id);
3002 }
3003 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3004 let cluster = state.get_cluster(cluster_id);
3005 let replica = cluster.replica(replica_id).expect("Must exist");
3006
3007 self.replicas
3008 .insert(replica.replica_id, (cluster.id, reason));
3009
3010 for (_id, entry) in state.get_entries() {
3014 if let CatalogItem::MaterializedView(mv) = entry.item() {
3015 if mv.target_replica == Some(replica_id)
3016 && !self.items.contains(&entry.id())
3017 {
3018 tracing::warn!(
3019 "implicitly dropping materialized view {} because target replica was dropped",
3020 entry.name().item,
3021 );
3022 self.items.push(entry.id());
3023 }
3024 }
3025 }
3026 }
3027 DropObjectInfo::Item(item_id) => {
3028 let entry = state.get_entry(&item_id);
3029 if item_id.is_system() {
3030 let name = entry.name();
3031 let full_name =
3032 state.resolve_full_name(name, session.map(|session| session.conn_id()));
3033 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3034 full_name.to_string(),
3035 ))));
3036 }
3037
3038 self.items.push(item_id);
3039 }
3040 DropObjectInfo::NetworkPolicy(network_policy_id) => {
3041 let policy = state.get_network_policy(&network_policy_id);
3042 let name = &policy.name;
3043 if network_policy_id.is_system() {
3044 return Err(AdapterError::Catalog(Error::new(
3045 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3046 )));
3047 }
3048
3049 self.network_policies.insert(network_policy_id);
3050 }
3051 }
3052
3053 Ok(())
3054 }
3055}
3056
3057#[cfg(test)]
3058mod tests {
3059 use mz_catalog::SYSTEM_CONN_ID;
3060 use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3061 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3062 use mz_repr::role_id::RoleId;
3063 use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3064 use mz_sql::DEFAULT_SCHEMA;
3065 use mz_sql::catalog::CatalogDatabase;
3066 use mz_sql::names::{
3067 ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3068 };
3069 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3070
3071 use crate::catalog::{Catalog, Op};
3072 use crate::session::DEFAULT_DATABASE_NAME;
3073
3074 #[mz_ore::test]
3075 fn test_update_privilege_owners() {
3076 let old_owner = RoleId::User(1);
3077 let new_owner = RoleId::User(2);
3078 let other_role = RoleId::User(3);
3079
3080 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3082 MzAclItem {
3083 grantee: other_role,
3084 grantor: old_owner,
3085 acl_mode: AclMode::UPDATE,
3086 },
3087 MzAclItem {
3088 grantee: other_role,
3089 grantor: new_owner,
3090 acl_mode: AclMode::SELECT,
3091 },
3092 ]);
3093 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3094 assert_eq!(1, privileges.all_values().count());
3095 assert_eq!(
3096 vec![MzAclItem {
3097 grantee: other_role,
3098 grantor: new_owner,
3099 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3100 }],
3101 privileges.all_values_owned().collect::<Vec<_>>()
3102 );
3103
3104 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3106 MzAclItem {
3107 grantee: old_owner,
3108 grantor: other_role,
3109 acl_mode: AclMode::UPDATE,
3110 },
3111 MzAclItem {
3112 grantee: new_owner,
3113 grantor: other_role,
3114 acl_mode: AclMode::SELECT,
3115 },
3116 ]);
3117 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3118 assert_eq!(1, privileges.all_values().count());
3119 assert_eq!(
3120 vec![MzAclItem {
3121 grantee: new_owner,
3122 grantor: other_role,
3123 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3124 }],
3125 privileges.all_values_owned().collect::<Vec<_>>()
3126 );
3127
3128 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3130 MzAclItem {
3131 grantee: old_owner,
3132 grantor: old_owner,
3133 acl_mode: AclMode::UPDATE,
3134 },
3135 MzAclItem {
3136 grantee: new_owner,
3137 grantor: new_owner,
3138 acl_mode: AclMode::SELECT,
3139 },
3140 ]);
3141 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3142 assert_eq!(1, privileges.all_values().count());
3143 assert_eq!(
3144 vec![MzAclItem {
3145 grantee: new_owner,
3146 grantor: new_owner,
3147 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3148 }],
3149 privileges.all_values_owned().collect::<Vec<_>>()
3150 );
3151 }
3152
3153 #[mz_ore::test(tokio::test)]
3160 #[cfg_attr(miri, ignore)] async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3162 Catalog::with_debug(|catalog| async move {
3163 let database = catalog
3165 .resolve_database(DEFAULT_DATABASE_NAME)
3166 .expect("default database");
3167 let database_name = database.name.clone();
3168 let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3169 let schema = catalog
3170 .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3171 .expect("default schema");
3172 let schema_name = schema.name.schema.clone();
3173 let schema_spec = schema.id.clone();
3174
3175 let (id_t1, global_id_t1) = catalog
3177 .allocate_user_id_for_test()
3178 .await
3179 .expect("allocate id for t1");
3180 let (id_t2, global_id_t2) = catalog
3181 .allocate_user_id_for_test()
3182 .await
3183 .expect("allocate id for t2");
3184
3185 let oracle_write_ts = catalog.current_upper().await;
3186
3187 let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3189 id,
3190 name: QualifiedItemName {
3191 qualifiers: ItemQualifiers {
3192 database_spec: database_spec.clone(),
3193 schema_spec: schema_spec.clone(),
3194 },
3195 item: name.to_string(),
3196 },
3197 item: CatalogItem::Table(Table {
3198 create_sql: Some(format!(
3199 "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3200 )),
3201 desc: VersionedRelationDesc::new(RelationDesc::empty()),
3202 collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3203 conn_id: None,
3204 resolved_ids: ResolvedIds::empty(),
3205 custom_logical_compaction_window: None,
3206 is_retained_metrics_object: false,
3207 data_source: TableDataSource::TableWrites { defaults: vec![] },
3208 }),
3209 owner_id: MZ_SYSTEM_ROLE_ID,
3210 };
3211
3212 let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3213 let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3214
3215 let base_state = catalog.state().clone();
3216
3217 let (state_after_t1, snapshot_after_t1) = catalog
3221 .transact_incremental_dry_run(
3222 &base_state,
3223 vec![op_t1.clone()],
3224 None,
3225 None,
3226 oracle_write_ts,
3227 )
3228 .await
3229 .expect("first dry run");
3230
3231 assert!(
3233 state_after_t1.try_get_entry(&id_t1).is_some(),
3234 "t1 should exist after first dry run"
3235 );
3236 assert_eq!(
3237 state_after_t1
3238 .try_get_entry(&id_t1)
3239 .expect("t1 entry")
3240 .name()
3241 .item,
3242 "t1"
3243 );
3244 assert!(
3245 state_after_t1.try_get_entry(&id_t2).is_none(),
3246 "t2 should NOT exist after first dry run"
3247 );
3248
3249 let (state_incremental, _) = catalog
3251 .transact_incremental_dry_run(
3252 &state_after_t1,
3253 vec![op_t2.clone()],
3254 None,
3255 Some(snapshot_after_t1),
3256 oracle_write_ts,
3257 )
3258 .await
3259 .expect("second dry run");
3260
3261 assert!(
3263 state_incremental.try_get_entry(&id_t1).is_some(),
3264 "t1 should exist in incremental result"
3265 );
3266 assert!(
3267 state_incremental.try_get_entry(&id_t2).is_some(),
3268 "t2 should exist in incremental result"
3269 );
3270
3271 let (state_all_at_once, _) = catalog
3274 .transact_incremental_dry_run(
3275 &base_state,
3276 vec![op_t1.clone(), op_t2.clone()],
3277 None,
3278 None,
3279 oracle_write_ts,
3280 )
3281 .await
3282 .expect("all-at-once dry run");
3283
3284 assert!(
3285 state_all_at_once.try_get_entry(&id_t1).is_some(),
3286 "t1 should exist in all-at-once result"
3287 );
3288 assert!(
3289 state_all_at_once.try_get_entry(&id_t2).is_some(),
3290 "t2 should exist in all-at-once result"
3291 );
3292
3293 let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3296 let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3297 assert_eq!(inc_t1.name(), all_t1.name());
3298 assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3299
3300 let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3301 let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3302 assert_eq!(inc_t2.name(), all_t2.name());
3303 assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3304
3305 catalog.expire().await;
3306 })
3307 .await
3308 }
3309}