1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34 StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50 RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69 is_reserved_role_name, object_type_to_audit_object_type,
70 system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
74use crate::coord::cluster_scheduling::SchedulingDecision;
75use crate::util::ResultExt;
76
77#[derive(Debug, Clone)]
78pub enum Op {
79 AlterRetainHistory {
80 id: CatalogItemId,
81 value: Option<Value>,
82 window: CompactionWindow,
83 },
84 AlterRole {
85 id: RoleId,
86 name: String,
87 attributes: RoleAttributesRaw,
88 nopassword: bool,
89 vars: RoleVars,
90 },
91 AlterNetworkPolicy {
92 id: NetworkPolicyId,
93 rules: Vec<NetworkPolicyRule>,
94 name: String,
95 owner_id: RoleId,
96 },
97 AlterAddColumn {
98 id: CatalogItemId,
99 new_global_id: GlobalId,
100 name: ColumnName,
101 typ: SqlColumnType,
102 sql: RawDataType,
103 },
104 AlterMaterializedViewApplyReplacement {
105 id: CatalogItemId,
106 replacement_id: CatalogItemId,
107 },
108 CreateDatabase {
109 name: String,
110 owner_id: RoleId,
111 },
112 CreateSchema {
113 database_id: ResolvedDatabaseSpecifier,
114 schema_name: String,
115 owner_id: RoleId,
116 },
117 CreateRole {
118 name: String,
119 attributes: RoleAttributesRaw,
120 },
121 CreateCluster {
122 id: ClusterId,
123 name: String,
124 introspection_sources: Vec<&'static BuiltinLog>,
125 owner_id: RoleId,
126 config: ClusterConfig,
127 },
128 CreateClusterReplica {
129 cluster_id: ClusterId,
130 name: String,
131 config: ReplicaConfig,
132 owner_id: RoleId,
133 reason: ReplicaCreateDropReason,
134 },
135 CreateItem {
136 id: CatalogItemId,
137 name: QualifiedItemName,
138 item: CatalogItem,
139 owner_id: RoleId,
140 },
141 CreateNetworkPolicy {
142 rules: Vec<NetworkPolicyRule>,
143 name: String,
144 owner_id: RoleId,
145 },
146 Comment {
147 object_id: CommentObjectId,
148 sub_component: Option<usize>,
149 comment: Option<String>,
150 },
151 DropObjects(Vec<DropObjectInfo>),
152 GrantRole {
153 role_id: RoleId,
154 member_id: RoleId,
155 grantor_id: RoleId,
156 },
157 RenameCluster {
158 id: ClusterId,
159 name: String,
160 to_name: String,
161 check_reserved_names: bool,
162 },
163 RenameClusterReplica {
164 cluster_id: ClusterId,
165 replica_id: ReplicaId,
166 name: QualifiedReplica,
167 to_name: String,
168 },
169 RenameItem {
170 id: CatalogItemId,
171 current_full_name: FullItemName,
172 to_name: String,
173 },
174 RenameSchema {
175 database_spec: ResolvedDatabaseSpecifier,
176 schema_spec: SchemaSpecifier,
177 new_name: String,
178 check_reserved_names: bool,
179 },
180 UpdateOwner {
181 id: ObjectId,
182 new_owner: RoleId,
183 },
184 UpdatePrivilege {
185 target_id: SystemObjectId,
186 privilege: MzAclItem,
187 variant: UpdatePrivilegeVariant,
188 },
189 UpdateDefaultPrivilege {
190 privilege_object: DefaultPrivilegeObject,
191 privilege_acl_item: DefaultPrivilegeAclItem,
192 variant: UpdatePrivilegeVariant,
193 },
194 RevokeRole {
195 role_id: RoleId,
196 member_id: RoleId,
197 grantor_id: RoleId,
198 },
199 UpdateClusterConfig {
200 id: ClusterId,
201 name: String,
202 config: ClusterConfig,
203 },
204 UpdateClusterReplicaConfig {
205 cluster_id: ClusterId,
206 replica_id: ReplicaId,
207 config: ReplicaConfig,
208 },
209 UpdateItem {
210 id: CatalogItemId,
211 name: QualifiedItemName,
212 to_item: CatalogItem,
213 },
214 UpdateSourceReferences {
215 source_id: CatalogItemId,
216 references: SourceReferences,
217 },
218 UpdateSystemConfiguration {
219 name: String,
220 value: OwnedVarInput,
221 },
222 ResetSystemConfiguration {
223 name: String,
224 },
225 ResetAllSystemConfiguration,
226 WeirdStorageUsageUpdates {
233 object_id: Option<String>,
234 size_bytes: u64,
235 collection_timestamp: EpochMillis,
236 },
237 TransactionDryRun,
243}
244
245#[derive(Debug, Clone)]
249pub enum DropObjectInfo {
250 Cluster(ClusterId),
251 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
252 Database(DatabaseId),
253 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
254 Role(RoleId),
255 Item(CatalogItemId),
256 NetworkPolicy(NetworkPolicyId),
257}
258
259impl DropObjectInfo {
260 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
263 match id {
264 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
265 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
266 cluster_id,
267 replica_id,
268 ReplicaCreateDropReason::Manual,
269 )),
270 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
271 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
272 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
273 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
274 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
275 }
276 }
277
278 fn to_object_id(&self) -> ObjectId {
281 match &self {
282 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
283 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
284 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
285 }
286 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
287 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
288 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
289 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
290 DropObjectInfo::NetworkPolicy(network_policy_id) => {
291 ObjectId::NetworkPolicy(network_policy_id.clone())
292 }
293 }
294 }
295}
296
297#[derive(Debug, Clone)]
299pub enum ReplicaCreateDropReason {
300 Manual,
305 ClusterScheduling(Vec<SchedulingDecision>),
308}
309
310impl ReplicaCreateDropReason {
311 pub fn into_audit_log(
312 self,
313 ) -> (
314 CreateOrDropClusterReplicaReasonV1,
315 Option<SchedulingDecisionsWithReasonsV2>,
316 ) {
317 let (reason, scheduling_policies) = match self {
318 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
319 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
320 CreateOrDropClusterReplicaReasonV1::Schedule,
321 Some(scheduling_decisions),
322 ),
323 };
324 (
325 reason,
326 scheduling_policies
327 .as_ref()
328 .map(SchedulingDecision::reasons_to_audit_log_reasons),
329 )
330 }
331}
332
333pub struct TransactionResult {
334 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
335 pub catalog_updates: Vec<ParsedStateUpdate>,
337 pub audit_events: Vec<VersionedEvent>,
338}
339
340impl Catalog {
341 fn should_audit_log_item(item: &CatalogItem) -> bool {
342 !item.is_temporary()
343 }
344
345 fn temporary_ids(
348 &self,
349 ops: &[Op],
350 temporary_drops: BTreeSet<(&ConnectionId, String)>,
351 ) -> Result<BTreeSet<CatalogItemId>, Error> {
352 let mut creating = BTreeSet::new();
353 let mut temporary_ids = BTreeSet::new();
354 for op in ops.iter() {
355 if let Op::CreateItem {
356 id,
357 name,
358 item,
359 owner_id: _,
360 } = op
361 {
362 if let Some(conn_id) = item.conn_id() {
363 if self.item_exists_in_temp_schemas(conn_id, &name.item)
364 && !temporary_drops.contains(&(conn_id, name.item.clone()))
365 || creating.contains(&(conn_id, &name.item))
366 {
367 return Err(
368 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
369 );
370 } else {
371 creating.insert((conn_id, &name.item));
372 temporary_ids.insert(id.clone());
373 }
374 }
375 }
376 }
377 Ok(temporary_ids)
378 }
379
380 #[instrument(name = "catalog::transact")]
381 pub async fn transact(
382 &mut self,
383 storage_collections: Option<
386 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
387 >,
388 oracle_write_ts: mz_repr::Timestamp,
389 session: Option<&ConnMeta>,
390 ops: Vec<Op>,
391 ) -> Result<TransactionResult, AdapterError> {
392 trace!("transact: {:?}", ops);
393 fail::fail_point!("catalog_transact", |arg| {
394 Err(AdapterError::Unstructured(anyhow::anyhow!(
395 "failpoint: {arg:?}"
396 )))
397 });
398
399 let drop_ids: BTreeSet<CatalogItemId> = ops
400 .iter()
401 .filter_map(|op| match op {
402 Op::DropObjects(drop_object_infos) => {
403 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
404 let item_ids = ids.filter_map(|id| match id {
405 ObjectId::Item(id) => Some(id),
406 _ => None,
407 });
408 Some(item_ids)
409 }
410 _ => None,
411 })
412 .flatten()
413 .collect();
414 let temporary_drops = drop_ids
415 .iter()
416 .filter_map(|id| {
417 let entry = self.get_entry(id);
418 match entry.item().conn_id() {
419 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
420 None => None,
421 }
422 })
423 .collect();
424 let dropped_global_ids = drop_ids
425 .iter()
426 .flat_map(|item_id| self.get_global_ids(item_id))
427 .collect();
428
429 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
430 let mut builtin_table_updates = vec![];
431 let mut catalog_updates = vec![];
432 let mut audit_events = vec![];
433 let mut storage = self.storage().await;
434 let mut tx = storage
435 .transaction()
436 .await
437 .unwrap_or_terminate("starting catalog transaction");
438
439 let new_state = Self::transact_inner(
440 storage_collections,
441 oracle_write_ts,
442 session,
443 ops,
444 temporary_ids,
445 &mut builtin_table_updates,
446 &mut catalog_updates,
447 &mut audit_events,
448 &mut tx,
449 &self.state,
450 )
451 .await?;
452
453 tx.commit(oracle_write_ts)
458 .await
459 .unwrap_or_terminate("catalog storage transaction commit must succeed");
460
461 drop(storage);
464 if let Some(new_state) = new_state {
465 self.transient_revision += 1;
466 self.state = new_state;
467 }
468
469 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
471 if self.state.system_config().enable_mz_notices() {
472 self.state().pack_optimizer_notices(
474 &mut builtin_table_updates,
475 dropped_notices.iter(),
476 Diff::MINUS_ONE,
477 );
478 }
479
480 Ok(TransactionResult {
481 builtin_table_updates,
482 catalog_updates,
483 audit_events,
484 })
485 }
486
487 #[instrument(name = "catalog::transact_inner")]
495 async fn transact_inner(
496 storage_collections: Option<
497 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
498 >,
499 oracle_write_ts: mz_repr::Timestamp,
500 session: Option<&ConnMeta>,
501 mut ops: Vec<Op>,
502 temporary_ids: BTreeSet<CatalogItemId>,
503 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
504 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
505 audit_events: &mut Vec<VersionedEvent>,
506 tx: &mut Transaction<'_>,
507 state: &CatalogState,
508 ) -> Result<Option<CatalogState>, AdapterError> {
509 let mut preliminary_state = Cow::Borrowed(state);
536
537 let mut state = Cow::Borrowed(state);
539
540 let dry_run_ops = match ops.last() {
541 Some(Op::TransactionDryRun) => {
542 ops.pop();
544 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
545 ops.clone()
546 }
547 Some(_) => vec![],
548 None => return Ok(None),
549 };
550
551 let mut storage_collections_to_create = BTreeSet::new();
552 let mut storage_collections_to_drop = BTreeSet::new();
553 let mut storage_collections_to_register = BTreeMap::new();
554
555 let mut updates = Vec::new();
556
557 for op in ops {
558 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
559 oracle_write_ts,
560 session,
561 op,
562 &temporary_ids,
563 audit_events,
564 tx,
565 &*preliminary_state,
566 &mut storage_collections_to_create,
567 &mut storage_collections_to_drop,
568 &mut storage_collections_to_register,
569 )
570 .await?;
571
572 if let Some(builtin_table_update) = weird_builtin_table_update {
581 builtin_table_updates.push(builtin_table_update);
582 }
583
584 let upper = tx.upper();
589 let temporary_item_updates =
590 temporary_item_updates
591 .into_iter()
592 .map(|(item, diff)| StateUpdate {
593 kind: StateUpdateKind::TemporaryItem(item),
594 ts: upper,
595 diff,
596 });
597
598 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
599 op_updates.extend(temporary_item_updates);
600 if !op_updates.is_empty() {
601 let (_op_builtin_table_updates, _op_catalog_updates) =
602 preliminary_state
603 .to_mut()
604 .apply_updates(op_updates.clone())?;
605 }
606 updates.append(&mut op_updates);
607 }
608
609 if !updates.is_empty() {
610 let (op_builtin_table_updates, op_catalog_updates) =
611 state.to_mut().apply_updates(updates.clone())?;
612 let op_builtin_table_updates = state
613 .to_mut()
614 .resolve_builtin_table_updates(op_builtin_table_updates);
615 builtin_table_updates.extend(op_builtin_table_updates);
616 parsed_catalog_updates.extend(op_catalog_updates);
617 }
618
619 if dry_run_ops.is_empty() {
620 if let Some(c) = storage_collections {
622 c.prepare_state(
623 tx,
624 storage_collections_to_create,
625 storage_collections_to_drop,
626 storage_collections_to_register,
627 )
628 .await?;
629 }
630
631 let updates = tx.get_and_commit_op_updates();
632 if !updates.is_empty() {
633 let (op_builtin_table_updates, op_catalog_updates) =
634 state.to_mut().apply_updates(updates.clone())?;
635 let op_builtin_table_updates = state
636 .to_mut()
637 .resolve_builtin_table_updates(op_builtin_table_updates);
638 builtin_table_updates.extend(op_builtin_table_updates);
639 parsed_catalog_updates.extend(op_catalog_updates);
640 }
641
642 match state {
643 Cow::Owned(state) => Ok(Some(state)),
644 Cow::Borrowed(_) => Ok(None),
645 }
646 } else {
647 Err(AdapterError::TransactionDryRun {
648 new_ops: dry_run_ops,
649 new_state: state.into_owned(),
650 })
651 }
652 }
653
654 #[instrument]
662 async fn transact_op(
663 oracle_write_ts: mz_repr::Timestamp,
664 session: Option<&ConnMeta>,
665 op: Op,
666 temporary_ids: &BTreeSet<CatalogItemId>,
667 audit_events: &mut Vec<VersionedEvent>,
668 tx: &mut Transaction<'_>,
669 state: &CatalogState,
670 storage_collections_to_create: &mut BTreeSet<GlobalId>,
671 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
672 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
673 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
674 let mut weird_builtin_table_update = None;
675 let mut temporary_item_updates = Vec::new();
676
677 match op {
678 Op::TransactionDryRun => {
679 unreachable!("TransactionDryRun can only be used a final element of ops")
680 }
681 Op::AlterRetainHistory { id, value, window } => {
682 let entry = state.get_entry(&id);
683 if id.is_system() {
684 let name = entry.name();
685 let full_name =
686 state.resolve_full_name(name, session.map(|session| session.conn_id()));
687 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
688 full_name.to_string(),
689 ))));
690 }
691
692 let mut new_entry = entry.clone();
693 let previous = new_entry
694 .item
695 .update_retain_history(value.clone(), window)
696 .map_err(|_| {
697 AdapterError::Catalog(Error::new(ErrorKind::Internal(
698 "planner should have rejected invalid alter retain history item type"
699 .to_string(),
700 )))
701 })?;
702
703 if Self::should_audit_log_item(new_entry.item()) {
704 let details =
705 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
706 id: id.to_string(),
707 old_history: previous.map(|previous| previous.to_string()),
708 new_history: value.map(|v| v.to_string()),
709 });
710 CatalogState::add_to_audit_log(
711 &state.system_configuration,
712 oracle_write_ts,
713 session,
714 tx,
715 audit_events,
716 EventType::Alter,
717 catalog_type_to_audit_object_type(new_entry.item().typ()),
718 details,
719 )?;
720 }
721
722 tx.update_item(id, new_entry.into())?;
723
724 Self::log_update(state, &id);
725 }
726 Op::AlterRole {
727 id,
728 name,
729 attributes,
730 nopassword,
731 vars,
732 } => {
733 state.ensure_not_reserved_role(&id)?;
734
735 let mut existing_role = state.get_role(&id).clone();
736 let password = attributes.password.clone();
737 let scram_iterations = attributes
738 .scram_iterations
739 .unwrap_or_else(|| state.system_config().scram_iterations());
740 existing_role.attributes = attributes.into();
741 existing_role.vars = vars;
742 let password_action = if nopassword {
743 PasswordAction::Clear
744 } else if let Some(password) = password {
745 PasswordAction::Set(PasswordConfig {
746 password,
747 scram_iterations,
748 })
749 } else {
750 PasswordAction::NoChange
751 };
752 tx.update_role(id, existing_role.into(), password_action)?;
753
754 CatalogState::add_to_audit_log(
755 &state.system_configuration,
756 oracle_write_ts,
757 session,
758 tx,
759 audit_events,
760 EventType::Alter,
761 ObjectType::Role,
762 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
763 id: id.to_string(),
764 name: name.clone(),
765 }),
766 )?;
767
768 info!("update role {name} ({id})");
769 }
770 Op::AlterNetworkPolicy {
771 id,
772 rules,
773 name,
774 owner_id: _owner_id,
775 } => {
776 let existing_policy = state.get_network_policy(&id).clone();
777 let mut policy: NetworkPolicy = existing_policy.into();
778 policy.rules = rules;
779 if is_reserved_name(&name) {
780 return Err(AdapterError::Catalog(Error::new(
781 ErrorKind::ReservedNetworkPolicyName(name),
782 )));
783 }
784 tx.update_network_policy(id, policy.clone())?;
785
786 CatalogState::add_to_audit_log(
787 &state.system_configuration,
788 oracle_write_ts,
789 session,
790 tx,
791 audit_events,
792 EventType::Alter,
793 ObjectType::NetworkPolicy,
794 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
795 id: id.to_string(),
796 name: name.clone(),
797 }),
798 )?;
799
800 info!("update network policy {name} ({id})");
801 }
802 Op::AlterAddColumn {
803 id,
804 new_global_id,
805 name,
806 typ,
807 sql,
808 } => {
809 let mut new_entry = state.get_entry(&id).clone();
810 let version = new_entry.item.add_column(name, typ, sql)?;
811 let shard_id = state
814 .storage_metadata()
815 .get_collection_shard(new_entry.latest_global_id())?;
816
817 let CatalogItem::Table(table) = &mut new_entry.item else {
819 return Err(AdapterError::Unsupported("adding columns to non-Table"));
820 };
821 table.collections.insert(version, new_global_id);
822
823 tx.update_item(id, new_entry.into())?;
824 storage_collections_to_register.insert(new_global_id, shard_id);
825 }
826 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
827 let mut new_entry = state.get_entry(&id).clone();
828 let replacement = state.get_entry(&replacement_id);
829
830 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
831 return Err(AdapterError::internal(
832 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
833 "id must refer to a materialized view",
834 ));
835 };
836 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
837 return Err(AdapterError::internal(
838 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
839 "replacement_id must refer to a materialized view",
840 ));
841 };
842
843 mv.apply_replacement(replacement_mv.clone());
844
845 tx.remove_item(replacement_id)?;
846 tx.update_item(id, new_entry.into())?;
847
848 }
850 Op::CreateDatabase { name, owner_id } => {
851 let database_owner_privileges = vec![rbac::owner_privilege(
852 mz_sql::catalog::ObjectType::Database,
853 owner_id,
854 )];
855 let database_default_privileges = state
856 .default_privileges
857 .get_applicable_privileges(
858 owner_id,
859 None,
860 None,
861 mz_sql::catalog::ObjectType::Database,
862 )
863 .map(|item| item.mz_acl_item(owner_id));
864 let database_privileges: Vec<_> = merge_mz_acl_items(
865 database_owner_privileges
866 .into_iter()
867 .chain(database_default_privileges),
868 )
869 .collect();
870
871 let schema_owner_privileges = vec![rbac::owner_privilege(
872 mz_sql::catalog::ObjectType::Schema,
873 owner_id,
874 )];
875 let schema_default_privileges = state
876 .default_privileges
877 .get_applicable_privileges(
878 owner_id,
879 None,
880 None,
881 mz_sql::catalog::ObjectType::Schema,
882 )
883 .map(|item| item.mz_acl_item(owner_id))
884 .chain(std::iter::once(MzAclItem {
886 grantee: RoleId::Public,
887 grantor: owner_id,
888 acl_mode: AclMode::USAGE,
889 }));
890 let schema_privileges: Vec<_> = merge_mz_acl_items(
891 schema_owner_privileges
892 .into_iter()
893 .chain(schema_default_privileges),
894 )
895 .collect();
896
897 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
898 let (database_id, _) = tx.insert_user_database(
899 &name,
900 owner_id,
901 database_privileges.clone(),
902 &temporary_oids,
903 )?;
904 let (schema_id, _) = tx.insert_user_schema(
905 database_id,
906 DEFAULT_SCHEMA,
907 owner_id,
908 schema_privileges.clone(),
909 &temporary_oids,
910 )?;
911 CatalogState::add_to_audit_log(
912 &state.system_configuration,
913 oracle_write_ts,
914 session,
915 tx,
916 audit_events,
917 EventType::Create,
918 ObjectType::Database,
919 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
920 id: database_id.to_string(),
921 name: name.clone(),
922 }),
923 )?;
924 info!("create database {}", name);
925
926 CatalogState::add_to_audit_log(
927 &state.system_configuration,
928 oracle_write_ts,
929 session,
930 tx,
931 audit_events,
932 EventType::Create,
933 ObjectType::Schema,
934 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
935 id: schema_id.to_string(),
936 name: DEFAULT_SCHEMA.to_string(),
937 database_name: Some(name),
938 }),
939 )?;
940 }
941 Op::CreateSchema {
942 database_id,
943 schema_name,
944 owner_id,
945 } => {
946 if is_reserved_name(&schema_name) {
947 return Err(AdapterError::Catalog(Error::new(
948 ErrorKind::ReservedSchemaName(schema_name),
949 )));
950 }
951 let database_id = match database_id {
952 ResolvedDatabaseSpecifier::Id(id) => id,
953 ResolvedDatabaseSpecifier::Ambient => {
954 return Err(AdapterError::Catalog(Error::new(
955 ErrorKind::ReadOnlySystemSchema(schema_name),
956 )));
957 }
958 };
959 let owner_privileges = vec![rbac::owner_privilege(
960 mz_sql::catalog::ObjectType::Schema,
961 owner_id,
962 )];
963 let default_privileges = state
964 .default_privileges
965 .get_applicable_privileges(
966 owner_id,
967 Some(database_id),
968 None,
969 mz_sql::catalog::ObjectType::Schema,
970 )
971 .map(|item| item.mz_acl_item(owner_id));
972 let privileges: Vec<_> =
973 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
974 .collect();
975 let (schema_id, _) = tx.insert_user_schema(
976 database_id,
977 &schema_name,
978 owner_id,
979 privileges.clone(),
980 &state.get_temporary_oids().collect(),
981 )?;
982 CatalogState::add_to_audit_log(
983 &state.system_configuration,
984 oracle_write_ts,
985 session,
986 tx,
987 audit_events,
988 EventType::Create,
989 ObjectType::Schema,
990 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
991 id: schema_id.to_string(),
992 name: schema_name.clone(),
993 database_name: Some(state.database_by_id[&database_id].name.clone()),
994 }),
995 )?;
996 }
997 Op::CreateRole { name, attributes } => {
998 if is_reserved_role_name(&name) {
999 return Err(AdapterError::Catalog(Error::new(
1000 ErrorKind::ReservedRoleName(name),
1001 )));
1002 }
1003 let membership = RoleMembership::new();
1004 let vars = RoleVars::default();
1005 let (id, _) = tx.insert_user_role(
1006 name.clone(),
1007 attributes.clone(),
1008 membership.clone(),
1009 vars.clone(),
1010 &state.get_temporary_oids().collect(),
1011 )?;
1012 CatalogState::add_to_audit_log(
1013 &state.system_configuration,
1014 oracle_write_ts,
1015 session,
1016 tx,
1017 audit_events,
1018 EventType::Create,
1019 ObjectType::Role,
1020 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1021 id: id.to_string(),
1022 name: name.clone(),
1023 }),
1024 )?;
1025 info!("create role {}", name);
1026 }
1027 Op::CreateCluster {
1028 id,
1029 name,
1030 introspection_sources,
1031 owner_id,
1032 config,
1033 } => {
1034 if is_reserved_name(&name) {
1035 return Err(AdapterError::Catalog(Error::new(
1036 ErrorKind::ReservedClusterName(name),
1037 )));
1038 }
1039 let owner_privileges = vec![rbac::owner_privilege(
1040 mz_sql::catalog::ObjectType::Cluster,
1041 owner_id,
1042 )];
1043 let default_privileges = state
1044 .default_privileges
1045 .get_applicable_privileges(
1046 owner_id,
1047 None,
1048 None,
1049 mz_sql::catalog::ObjectType::Cluster,
1050 )
1051 .map(|item| item.mz_acl_item(owner_id));
1052 let privileges: Vec<_> =
1053 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1054 .collect();
1055 let introspection_source_ids: Vec<_> = introspection_sources
1056 .iter()
1057 .map(|introspection_source| {
1058 Transaction::allocate_introspection_source_index_id(
1059 &id,
1060 introspection_source.variant,
1061 )
1062 })
1063 .collect();
1064
1065 let introspection_sources = introspection_sources
1066 .into_iter()
1067 .zip_eq(introspection_source_ids)
1068 .map(|(log, (item_id, gid))| (log, item_id, gid))
1069 .collect();
1070
1071 tx.insert_user_cluster(
1072 id,
1073 &name,
1074 introspection_sources,
1075 owner_id,
1076 privileges.clone(),
1077 config.clone().into(),
1078 &state.get_temporary_oids().collect(),
1079 )?;
1080 CatalogState::add_to_audit_log(
1081 &state.system_configuration,
1082 oracle_write_ts,
1083 session,
1084 tx,
1085 audit_events,
1086 EventType::Create,
1087 ObjectType::Cluster,
1088 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1089 id: id.to_string(),
1090 name: name.clone(),
1091 }),
1092 )?;
1093 info!("create cluster {}", name);
1094 }
1095 Op::CreateClusterReplica {
1096 cluster_id,
1097 name,
1098 config,
1099 owner_id,
1100 reason,
1101 } => {
1102 if is_reserved_name(&name) {
1103 return Err(AdapterError::Catalog(Error::new(
1104 ErrorKind::ReservedReplicaName(name),
1105 )));
1106 }
1107 let cluster = state.get_cluster(cluster_id);
1108 let id =
1109 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1110 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1111 size,
1112 billed_as,
1113 internal,
1114 ..
1115 }) = &config.location
1116 {
1117 let (reason, scheduling_policies) = reason.into_audit_log();
1118 let details = EventDetails::CreateClusterReplicaV4(
1119 mz_audit_log::CreateClusterReplicaV4 {
1120 cluster_id: cluster_id.to_string(),
1121 cluster_name: cluster.name.clone(),
1122 replica_id: Some(id.to_string()),
1123 replica_name: name.clone(),
1124 logical_size: size.clone(),
1125 billed_as: billed_as.clone(),
1126 internal: *internal,
1127 reason,
1128 scheduling_policies,
1129 },
1130 );
1131 CatalogState::add_to_audit_log(
1132 &state.system_configuration,
1133 oracle_write_ts,
1134 session,
1135 tx,
1136 audit_events,
1137 EventType::Create,
1138 ObjectType::ClusterReplica,
1139 details,
1140 )?;
1141 }
1142 }
1143 Op::CreateItem {
1144 id,
1145 name,
1146 item,
1147 owner_id,
1148 } => {
1149 state.check_unstable_dependencies(&item)?;
1150
1151 match &item {
1152 CatalogItem::Table(table) => {
1153 let gids: Vec<_> = table.global_ids().collect();
1154 assert_eq!(gids.len(), 1);
1155 storage_collections_to_create.extend(gids);
1156 }
1157 CatalogItem::Source(source) => {
1158 storage_collections_to_create.insert(source.global_id());
1159 }
1160 CatalogItem::MaterializedView(mv) => {
1161 let mv_gid = mv.global_id_writes();
1162 if let Some(target_id) = mv.replacement_target {
1163 let target_gid = state.get_entry(&target_id).latest_global_id();
1164 let shard_id =
1165 state.storage_metadata().get_collection_shard(target_gid)?;
1166 storage_collections_to_register.insert(mv_gid, shard_id);
1167 } else {
1168 storage_collections_to_create.insert(mv_gid);
1169 }
1170 }
1171 CatalogItem::ContinualTask(ct) => {
1172 storage_collections_to_create.insert(ct.global_id());
1173 }
1174 CatalogItem::Sink(sink) => {
1175 storage_collections_to_create.insert(sink.global_id());
1176 }
1177 CatalogItem::Log(_)
1178 | CatalogItem::View(_)
1179 | CatalogItem::Index(_)
1180 | CatalogItem::Type(_)
1181 | CatalogItem::Func(_)
1182 | CatalogItem::Secret(_)
1183 | CatalogItem::Connection(_) => (),
1184 }
1185
1186 let system_user = session.map_or(false, |s| s.user().is_system_user());
1187 if !system_user {
1188 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1189 let cluster_name = state.clusters_by_id[&id].name.clone();
1190 return Err(AdapterError::Catalog(Error::new(
1191 ErrorKind::ReadOnlyCluster(cluster_name),
1192 )));
1193 }
1194 }
1195
1196 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1197 let default_privileges = state
1198 .default_privileges
1199 .get_applicable_privileges(
1200 owner_id,
1201 name.qualifiers.database_spec.id(),
1202 Some(name.qualifiers.schema_spec.into()),
1203 item.typ().into(),
1204 )
1205 .map(|item| item.mz_acl_item(owner_id));
1206 let progress_source_privilege = if item.is_progress_source() {
1208 Some(MzAclItem {
1209 grantee: MZ_SUPPORT_ROLE_ID,
1210 grantor: owner_id,
1211 acl_mode: AclMode::SELECT,
1212 })
1213 } else {
1214 None
1215 };
1216 let privileges: Vec<_> = merge_mz_acl_items(
1217 owner_privileges
1218 .into_iter()
1219 .chain(default_privileges)
1220 .chain(progress_source_privilege),
1221 )
1222 .collect();
1223
1224 let temporary_oids = state.get_temporary_oids().collect();
1225
1226 if item.is_temporary() {
1227 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1228 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1229 {
1230 return Err(AdapterError::Catalog(Error::new(
1231 ErrorKind::InvalidTemporarySchema,
1232 )));
1233 }
1234 let oid = tx.allocate_oid(&temporary_oids)?;
1235
1236 let schema_id = name.qualifiers.schema_spec.clone().into();
1237 let item_type = item.typ();
1238 let (create_sql, global_id, versions) = item.to_serialized();
1239
1240 let item = TemporaryItem {
1241 id,
1242 oid,
1243 global_id,
1244 schema_id,
1245 name: name.item.clone(),
1246 create_sql,
1247 conn_id: item.conn_id().cloned(),
1248 owner_id,
1249 privileges: privileges.clone(),
1250 extra_versions: versions,
1251 };
1252 temporary_item_updates.push((item, StateDiff::Addition));
1253
1254 info!(
1255 "create temporary {} {} ({})",
1256 item_type,
1257 state.resolve_full_name(&name, None),
1258 id
1259 );
1260 } else {
1261 if let Some(temp_id) =
1262 item.uses()
1263 .iter()
1264 .find(|id| match state.try_get_entry(*id) {
1265 Some(entry) => entry.item().is_temporary(),
1266 None => temporary_ids.contains(id),
1267 })
1268 {
1269 let temp_item = state.get_entry(temp_id);
1270 return Err(AdapterError::Catalog(Error::new(
1271 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1272 )));
1273 }
1274 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1275 && !system_user
1276 {
1277 let schema_name = state
1278 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1279 .schema;
1280 return Err(AdapterError::Catalog(Error::new(
1281 ErrorKind::ReadOnlySystemSchema(schema_name),
1282 )));
1283 }
1284 let schema_id = name.qualifiers.schema_spec.clone().into();
1285 let item_type = item.typ();
1286 let (create_sql, global_id, versions) = item.to_serialized();
1287 tx.insert_user_item(
1288 id,
1289 global_id,
1290 schema_id,
1291 &name.item,
1292 create_sql,
1293 owner_id,
1294 privileges.clone(),
1295 &temporary_oids,
1296 versions,
1297 )?;
1298 info!(
1299 "create {} {} ({})",
1300 item_type,
1301 state.resolve_full_name(&name, None),
1302 id
1303 );
1304 }
1305
1306 if Self::should_audit_log_item(&item) {
1307 let name = Self::full_name_detail(
1308 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1309 );
1310 let details = match &item {
1311 CatalogItem::Source(s) => {
1312 let cluster_id = match s.data_source {
1313 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1316 match state.get_entry(&ingestion_id).cluster_id() {
1317 Some(cluster_id) => Some(cluster_id.to_string()),
1318 None => None,
1319 }
1320 }
1321 _ => match item.cluster_id() {
1322 Some(cluster_id) => Some(cluster_id.to_string()),
1323 None => None,
1324 },
1325 };
1326
1327 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1328 id: id.to_string(),
1329 cluster_id,
1330 name,
1331 external_type: s.source_type().to_string(),
1332 })
1333 }
1334 CatalogItem::Sink(s) => {
1335 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1336 id: id.to_string(),
1337 cluster_id: Some(s.cluster_id.to_string()),
1338 name,
1339 external_type: s.sink_type().to_string(),
1340 })
1341 }
1342 CatalogItem::Index(i) => {
1343 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1344 id: id.to_string(),
1345 name,
1346 cluster_id: i.cluster_id.to_string(),
1347 })
1348 }
1349 CatalogItem::MaterializedView(mv) => {
1350 EventDetails::CreateMaterializedViewV1(
1351 mz_audit_log::CreateMaterializedViewV1 {
1352 id: id.to_string(),
1353 name,
1354 cluster_id: mv.cluster_id.to_string(),
1355 },
1356 )
1357 }
1358 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1359 id: id.to_string(),
1360 name,
1361 }),
1362 };
1363 CatalogState::add_to_audit_log(
1364 &state.system_configuration,
1365 oracle_write_ts,
1366 session,
1367 tx,
1368 audit_events,
1369 EventType::Create,
1370 catalog_type_to_audit_object_type(item.typ()),
1371 details,
1372 )?;
1373 }
1374 }
1375 Op::CreateNetworkPolicy {
1376 rules,
1377 name,
1378 owner_id,
1379 } => {
1380 if state.network_policies_by_name.contains_key(&name) {
1381 return Err(AdapterError::PlanError(PlanError::Catalog(
1382 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1383 )));
1384 }
1385 if is_reserved_name(&name) {
1386 return Err(AdapterError::Catalog(Error::new(
1387 ErrorKind::ReservedNetworkPolicyName(name),
1388 )));
1389 }
1390
1391 let owner_privileges = vec![rbac::owner_privilege(
1392 mz_sql::catalog::ObjectType::NetworkPolicy,
1393 owner_id,
1394 )];
1395 let default_privileges = state
1396 .default_privileges
1397 .get_applicable_privileges(
1398 owner_id,
1399 None,
1400 None,
1401 mz_sql::catalog::ObjectType::NetworkPolicy,
1402 )
1403 .map(|item| item.mz_acl_item(owner_id));
1404 let privileges: Vec<_> =
1405 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1406 .collect();
1407
1408 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1409 let id = tx.insert_user_network_policy(
1410 name.clone(),
1411 rules,
1412 privileges,
1413 owner_id,
1414 &temporary_oids,
1415 )?;
1416
1417 CatalogState::add_to_audit_log(
1418 &state.system_configuration,
1419 oracle_write_ts,
1420 session,
1421 tx,
1422 audit_events,
1423 EventType::Create,
1424 ObjectType::NetworkPolicy,
1425 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1426 id: id.to_string(),
1427 name: name.clone(),
1428 }),
1429 )?;
1430
1431 info!("created network policy {name} ({id})");
1432 }
1433 Op::Comment {
1434 object_id,
1435 sub_component,
1436 comment,
1437 } => {
1438 tx.update_comment(object_id, sub_component, comment)?;
1439 let entry = state.get_comment_id_entry(&object_id);
1440 let should_log = entry
1441 .map(|entry| Self::should_audit_log_item(entry.item()))
1442 .unwrap_or(true);
1444 if let (Some(conn_id), true) =
1447 (session.map(|session| session.conn_id()), should_log)
1448 {
1449 CatalogState::add_to_audit_log(
1450 &state.system_configuration,
1451 oracle_write_ts,
1452 session,
1453 tx,
1454 audit_events,
1455 EventType::Comment,
1456 comment_id_to_audit_object_type(object_id),
1457 EventDetails::IdNameV1(IdNameV1 {
1458 id: format!("{object_id:?}"),
1460 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1461 }),
1462 )?;
1463 }
1464 }
1465 Op::UpdateSourceReferences {
1466 source_id,
1467 references,
1468 } => {
1469 tx.update_source_references(
1470 source_id,
1471 references
1472 .references
1473 .into_iter()
1474 .map(|reference| reference.into())
1475 .collect(),
1476 references.updated_at,
1477 )?;
1478 }
1479 Op::DropObjects(drop_object_infos) => {
1480 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1482
1483 tx.drop_comments(&delta.comments)?;
1485
1486 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1488 delta
1489 .items
1490 .iter()
1491 .map(|id| id)
1492 .partition(|id| !state.get_entry(*id).item().is_temporary());
1493 tx.remove_items(&durable_items_to_drop)?;
1494 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1495 let entry = state.get_entry(&id);
1496 (entry.clone().into(), StateDiff::Retraction)
1497 }));
1498
1499 for item_id in delta.items {
1500 let entry = state.get_entry(&item_id);
1501
1502 if entry.item().is_storage_collection() {
1503 storage_collections_to_drop.extend(entry.global_ids());
1504 }
1505
1506 if state.source_references.contains_key(&item_id) {
1507 tx.remove_source_references(item_id)?;
1508 }
1509
1510 if Self::should_audit_log_item(entry.item()) {
1511 CatalogState::add_to_audit_log(
1512 &state.system_configuration,
1513 oracle_write_ts,
1514 session,
1515 tx,
1516 audit_events,
1517 EventType::Drop,
1518 catalog_type_to_audit_object_type(entry.item().typ()),
1519 EventDetails::IdFullNameV1(IdFullNameV1 {
1520 id: item_id.to_string(),
1521 name: Self::full_name_detail(&state.resolve_full_name(
1522 entry.name(),
1523 session.map(|session| session.conn_id()),
1524 )),
1525 }),
1526 )?;
1527 }
1528 info!(
1529 "drop {} {} ({})",
1530 entry.item_type(),
1531 state.resolve_full_name(entry.name(), entry.conn_id()),
1532 item_id
1533 );
1534 }
1535
1536 let schemas = delta
1538 .schemas
1539 .iter()
1540 .map(|(schema_spec, database_spec)| {
1541 (SchemaId::from(schema_spec), *database_spec)
1542 })
1543 .collect();
1544 tx.remove_schemas(&schemas)?;
1545
1546 for (schema_spec, database_spec) in delta.schemas {
1547 let schema = state.get_schema(
1548 &database_spec,
1549 &schema_spec,
1550 session
1551 .map(|session| session.conn_id())
1552 .unwrap_or(&SYSTEM_CONN_ID),
1553 );
1554
1555 let schema_id = SchemaId::from(schema_spec);
1556 let database_id = match database_spec {
1557 ResolvedDatabaseSpecifier::Ambient => None,
1558 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1559 };
1560
1561 CatalogState::add_to_audit_log(
1562 &state.system_configuration,
1563 oracle_write_ts,
1564 session,
1565 tx,
1566 audit_events,
1567 EventType::Drop,
1568 ObjectType::Schema,
1569 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1570 id: schema_id.to_string(),
1571 name: schema.name.schema.to_string(),
1572 database_name: database_id
1573 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1574 }),
1575 )?;
1576 }
1577
1578 tx.remove_databases(&delta.databases)?;
1580
1581 for database_id in delta.databases {
1582 let database = state.get_database(&database_id).clone();
1583
1584 CatalogState::add_to_audit_log(
1585 &state.system_configuration,
1586 oracle_write_ts,
1587 session,
1588 tx,
1589 audit_events,
1590 EventType::Drop,
1591 ObjectType::Database,
1592 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1593 id: database_id.to_string(),
1594 name: database.name.clone(),
1595 }),
1596 )?;
1597 }
1598
1599 tx.remove_user_roles(&delta.roles)?;
1601
1602 for role_id in delta.roles {
1603 let role = state
1604 .roles_by_id
1605 .get(&role_id)
1606 .expect("catalog out of sync");
1607
1608 CatalogState::add_to_audit_log(
1609 &state.system_configuration,
1610 oracle_write_ts,
1611 session,
1612 tx,
1613 audit_events,
1614 EventType::Drop,
1615 ObjectType::Role,
1616 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1617 id: role.id.to_string(),
1618 name: role.name.clone(),
1619 }),
1620 )?;
1621 info!("drop role {}", role.name());
1622 }
1623
1624 tx.remove_network_policies(&delta.network_policies)?;
1626
1627 for network_policy_id in delta.network_policies {
1628 let policy = state
1629 .network_policies_by_id
1630 .get(&network_policy_id)
1631 .expect("catalog out of sync");
1632
1633 CatalogState::add_to_audit_log(
1634 &state.system_configuration,
1635 oracle_write_ts,
1636 session,
1637 tx,
1638 audit_events,
1639 EventType::Drop,
1640 ObjectType::NetworkPolicy,
1641 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1642 id: policy.id.to_string(),
1643 name: policy.name.clone(),
1644 }),
1645 )?;
1646 info!("drop network policy {}", policy.name.clone());
1647 }
1648
1649 let replicas = delta.replicas.keys().copied().collect();
1651 tx.remove_cluster_replicas(&replicas)?;
1652
1653 for (replica_id, (cluster_id, reason)) in delta.replicas {
1654 let cluster = state.get_cluster(cluster_id);
1655 let replica = cluster.replica(replica_id).expect("Must exist");
1656
1657 let (reason, scheduling_policies) = reason.into_audit_log();
1658 let details =
1659 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1660 cluster_id: cluster_id.to_string(),
1661 cluster_name: cluster.name.clone(),
1662 replica_id: Some(replica_id.to_string()),
1663 replica_name: replica.name.clone(),
1664 reason,
1665 scheduling_policies,
1666 });
1667 CatalogState::add_to_audit_log(
1668 &state.system_configuration,
1669 oracle_write_ts,
1670 session,
1671 tx,
1672 audit_events,
1673 EventType::Drop,
1674 ObjectType::ClusterReplica,
1675 details,
1676 )?;
1677 }
1678
1679 tx.remove_clusters(&delta.clusters)?;
1681
1682 for cluster_id in delta.clusters {
1683 let cluster = state.get_cluster(cluster_id);
1684
1685 CatalogState::add_to_audit_log(
1686 &state.system_configuration,
1687 oracle_write_ts,
1688 session,
1689 tx,
1690 audit_events,
1691 EventType::Drop,
1692 ObjectType::Cluster,
1693 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1694 id: cluster.id.to_string(),
1695 name: cluster.name.clone(),
1696 }),
1697 )?;
1698 }
1699 }
1700 Op::GrantRole {
1701 role_id,
1702 member_id,
1703 grantor_id,
1704 } => {
1705 state.ensure_not_reserved_role(&member_id)?;
1706 state.ensure_grantable_role(&role_id)?;
1707 if state.collect_role_membership(&role_id).contains(&member_id) {
1708 let group_role = state.get_role(&role_id);
1709 let member_role = state.get_role(&member_id);
1710 return Err(AdapterError::Catalog(Error::new(
1711 ErrorKind::CircularRoleMembership {
1712 role_name: group_role.name().to_string(),
1713 member_name: member_role.name().to_string(),
1714 },
1715 )));
1716 }
1717 let mut member_role = state.get_role(&member_id).clone();
1718 member_role.membership.map.insert(role_id, grantor_id);
1719 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1720
1721 CatalogState::add_to_audit_log(
1722 &state.system_configuration,
1723 oracle_write_ts,
1724 session,
1725 tx,
1726 audit_events,
1727 EventType::Grant,
1728 ObjectType::Role,
1729 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1730 role_id: role_id.to_string(),
1731 member_id: member_id.to_string(),
1732 grantor_id: grantor_id.to_string(),
1733 executed_by: session
1734 .map(|session| session.authenticated_role_id())
1735 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1736 .to_string(),
1737 }),
1738 )?;
1739 }
1740 Op::RevokeRole {
1741 role_id,
1742 member_id,
1743 grantor_id,
1744 } => {
1745 state.ensure_not_reserved_role(&member_id)?;
1746 state.ensure_grantable_role(&role_id)?;
1747 let mut member_role = state.get_role(&member_id).clone();
1748 member_role.membership.map.remove(&role_id);
1749 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1750
1751 CatalogState::add_to_audit_log(
1752 &state.system_configuration,
1753 oracle_write_ts,
1754 session,
1755 tx,
1756 audit_events,
1757 EventType::Revoke,
1758 ObjectType::Role,
1759 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1760 role_id: role_id.to_string(),
1761 member_id: member_id.to_string(),
1762 grantor_id: grantor_id.to_string(),
1763 executed_by: session
1764 .map(|session| session.authenticated_role_id())
1765 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1766 .to_string(),
1767 }),
1768 )?;
1769 }
1770 Op::UpdatePrivilege {
1771 target_id,
1772 privilege,
1773 variant,
1774 } => {
1775 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1776 UpdatePrivilegeVariant::Grant => {
1777 privileges.grant(privilege);
1778 }
1779 UpdatePrivilegeVariant::Revoke => {
1780 privileges.revoke(&privilege);
1781 }
1782 };
1783 match &target_id {
1784 SystemObjectId::Object(object_id) => match object_id {
1785 ObjectId::Cluster(id) => {
1786 let mut cluster = state.get_cluster(*id).clone();
1787 update_privilege_fn(&mut cluster.privileges);
1788 tx.update_cluster(*id, cluster.into())?;
1789 }
1790 ObjectId::Database(id) => {
1791 let mut database = state.get_database(id).clone();
1792 update_privilege_fn(&mut database.privileges);
1793 tx.update_database(*id, database.into())?;
1794 }
1795 ObjectId::NetworkPolicy(id) => {
1796 let mut policy = state.get_network_policy(id).clone();
1797 update_privilege_fn(&mut policy.privileges);
1798 tx.update_network_policy(*id, policy.into())?;
1799 }
1800 ObjectId::Schema((database_spec, schema_spec)) => {
1801 let schema_id = schema_spec.clone().into();
1802 let mut schema = state
1803 .get_schema(
1804 database_spec,
1805 schema_spec,
1806 session
1807 .map(|session| session.conn_id())
1808 .unwrap_or(&SYSTEM_CONN_ID),
1809 )
1810 .clone();
1811 update_privilege_fn(&mut schema.privileges);
1812 tx.update_schema(schema_id, schema.into())?;
1813 }
1814 ObjectId::Item(id) => {
1815 let entry = state.get_entry(id);
1816 let mut new_entry = entry.clone();
1817 update_privilege_fn(&mut new_entry.privileges);
1818 if !new_entry.item().is_temporary() {
1819 tx.update_item(*id, new_entry.into())?;
1820 } else {
1821 temporary_item_updates
1822 .push((entry.clone().into(), StateDiff::Retraction));
1823 temporary_item_updates
1824 .push((new_entry.into(), StateDiff::Addition));
1825 }
1826 }
1827 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1828 },
1829 SystemObjectId::System => {
1830 let mut system_privileges = state.system_privileges.clone();
1831 update_privilege_fn(&mut system_privileges);
1832 let new_privilege =
1833 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1834 tx.set_system_privilege(
1835 privilege.grantee,
1836 privilege.grantor,
1837 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1838 )?;
1839 }
1840 }
1841 let object_type = state.get_system_object_type(&target_id);
1842 let object_id_str = match &target_id {
1843 SystemObjectId::System => "SYSTEM".to_string(),
1844 SystemObjectId::Object(id) => id.to_string(),
1845 };
1846 CatalogState::add_to_audit_log(
1847 &state.system_configuration,
1848 oracle_write_ts,
1849 session,
1850 tx,
1851 audit_events,
1852 variant.into(),
1853 system_object_type_to_audit_object_type(&object_type),
1854 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1855 object_id: object_id_str,
1856 grantee_id: privilege.grantee.to_string(),
1857 grantor_id: privilege.grantor.to_string(),
1858 privileges: privilege.acl_mode.to_string(),
1859 }),
1860 )?;
1861 }
1862 Op::UpdateDefaultPrivilege {
1863 privilege_object,
1864 privilege_acl_item,
1865 variant,
1866 } => {
1867 let mut default_privileges = state.default_privileges.clone();
1868 match variant {
1869 UpdatePrivilegeVariant::Grant => default_privileges
1870 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1871 UpdatePrivilegeVariant::Revoke => {
1872 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1873 }
1874 }
1875 let new_acl_mode = default_privileges
1876 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1877 tx.set_default_privilege(
1878 privilege_object.role_id,
1879 privilege_object.database_id,
1880 privilege_object.schema_id,
1881 privilege_object.object_type,
1882 privilege_acl_item.grantee,
1883 new_acl_mode.cloned(),
1884 )?;
1885 CatalogState::add_to_audit_log(
1886 &state.system_configuration,
1887 oracle_write_ts,
1888 session,
1889 tx,
1890 audit_events,
1891 variant.into(),
1892 object_type_to_audit_object_type(privilege_object.object_type),
1893 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1894 role_id: privilege_object.role_id.to_string(),
1895 database_id: privilege_object.database_id.map(|id| id.to_string()),
1896 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1897 grantee_id: privilege_acl_item.grantee.to_string(),
1898 privileges: privilege_acl_item.acl_mode.to_string(),
1899 }),
1900 )?;
1901 }
1902 Op::RenameCluster {
1903 id,
1904 name,
1905 to_name,
1906 check_reserved_names,
1907 } => {
1908 if id.is_system() {
1909 return Err(AdapterError::Catalog(Error::new(
1910 ErrorKind::ReadOnlyCluster(name.clone()),
1911 )));
1912 }
1913 if check_reserved_names && is_reserved_name(&to_name) {
1914 return Err(AdapterError::Catalog(Error::new(
1915 ErrorKind::ReservedClusterName(to_name),
1916 )));
1917 }
1918 tx.rename_cluster(id, &name, &to_name)?;
1919 CatalogState::add_to_audit_log(
1920 &state.system_configuration,
1921 oracle_write_ts,
1922 session,
1923 tx,
1924 audit_events,
1925 EventType::Alter,
1926 ObjectType::Cluster,
1927 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1928 id: id.to_string(),
1929 old_name: name.clone(),
1930 new_name: to_name.clone(),
1931 }),
1932 )?;
1933 info!("rename cluster {name} to {to_name}");
1934 }
1935 Op::RenameClusterReplica {
1936 cluster_id,
1937 replica_id,
1938 name,
1939 to_name,
1940 } => {
1941 if is_reserved_name(&to_name) {
1942 return Err(AdapterError::Catalog(Error::new(
1943 ErrorKind::ReservedReplicaName(to_name),
1944 )));
1945 }
1946 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1947 CatalogState::add_to_audit_log(
1948 &state.system_configuration,
1949 oracle_write_ts,
1950 session,
1951 tx,
1952 audit_events,
1953 EventType::Alter,
1954 ObjectType::ClusterReplica,
1955 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1956 cluster_id: cluster_id.to_string(),
1957 replica_id: replica_id.to_string(),
1958 old_name: name.replica.as_str().to_string(),
1959 new_name: to_name.clone(),
1960 }),
1961 )?;
1962 info!("rename cluster replica {name} to {to_name}");
1963 }
1964 Op::RenameItem {
1965 id,
1966 to_name,
1967 current_full_name,
1968 } => {
1969 let mut updates = Vec::new();
1970
1971 let entry = state.get_entry(&id);
1972 if let CatalogItem::Type(_) = entry.item() {
1973 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1974 current_full_name.to_string(),
1975 ))));
1976 }
1977
1978 if entry.id().is_system() {
1979 let name = state
1980 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1981 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1982 name.to_string(),
1983 ))));
1984 }
1985
1986 let mut to_full_name = current_full_name.clone();
1987 to_full_name.item.clone_from(&to_name);
1988
1989 let mut to_qualified_name = entry.name().clone();
1990 to_qualified_name.item.clone_from(&to_name);
1991
1992 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1993 id: id.to_string(),
1994 old_name: Self::full_name_detail(¤t_full_name),
1995 new_name: Self::full_name_detail(&to_full_name),
1996 });
1997 if Self::should_audit_log_item(entry.item()) {
1998 CatalogState::add_to_audit_log(
1999 &state.system_configuration,
2000 oracle_write_ts,
2001 session,
2002 tx,
2003 audit_events,
2004 EventType::Alter,
2005 catalog_type_to_audit_object_type(entry.item().typ()),
2006 details,
2007 )?;
2008 }
2009
2010 let mut new_entry = entry.clone();
2012 new_entry.name.item.clone_from(&to_name);
2013 new_entry.item = entry
2014 .item()
2015 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2016 .map_err(|e| {
2017 Error::new(ErrorKind::from(AmbiguousRename {
2018 depender: state
2019 .resolve_full_name(entry.name(), entry.conn_id())
2020 .to_string(),
2021 dependee: state
2022 .resolve_full_name(entry.name(), entry.conn_id())
2023 .to_string(),
2024 message: e,
2025 }))
2026 })?;
2027
2028 for id in entry.referenced_by() {
2029 let dependent_item = state.get_entry(id);
2030 let mut to_entry = dependent_item.clone();
2031 to_entry.item = dependent_item
2032 .item()
2033 .rename_item_refs(
2034 current_full_name.clone(),
2035 to_full_name.item.clone(),
2036 false,
2037 )
2038 .map_err(|e| {
2039 Error::new(ErrorKind::from(AmbiguousRename {
2040 depender: state
2041 .resolve_full_name(
2042 dependent_item.name(),
2043 dependent_item.conn_id(),
2044 )
2045 .to_string(),
2046 dependee: state
2047 .resolve_full_name(entry.name(), entry.conn_id())
2048 .to_string(),
2049 message: e,
2050 }))
2051 })?;
2052
2053 if !to_entry.item().is_temporary() {
2054 tx.update_item(*id, to_entry.into())?;
2055 } else {
2056 temporary_item_updates
2057 .push((dependent_item.clone().into(), StateDiff::Retraction));
2058 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2059 }
2060 updates.push(*id);
2061 }
2062 if !new_entry.item().is_temporary() {
2063 tx.update_item(id, new_entry.into())?;
2064 } else {
2065 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2066 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2067 }
2068
2069 updates.push(id);
2070 for id in updates {
2071 Self::log_update(state, &id);
2072 }
2073 }
2074 Op::RenameSchema {
2075 database_spec,
2076 schema_spec,
2077 new_name,
2078 check_reserved_names,
2079 } => {
2080 if check_reserved_names && is_reserved_name(&new_name) {
2081 return Err(AdapterError::Catalog(Error::new(
2082 ErrorKind::ReservedSchemaName(new_name),
2083 )));
2084 }
2085
2086 let conn_id = session
2087 .map(|session| session.conn_id())
2088 .unwrap_or(&SYSTEM_CONN_ID);
2089
2090 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2091 let cur_name = schema.name().schema.clone();
2092
2093 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2094 return Err(AdapterError::Catalog(Error::new(
2095 ErrorKind::AmbientSchemaRename(cur_name),
2096 )));
2097 };
2098 let database = state.get_database(&database_id);
2099 let database_name = &database.name;
2100
2101 let mut updates = Vec::new();
2102 let mut items_to_update = BTreeMap::new();
2103
2104 let mut update_item = |id| {
2105 if items_to_update.contains_key(id) {
2106 return Ok(());
2107 }
2108
2109 let entry = state.get_entry(id);
2110
2111 let mut new_entry = entry.clone();
2113 new_entry.item = entry
2114 .item
2115 .rename_schema_refs(database_name, &cur_name, &new_name)
2116 .map_err(|(s, _i)| {
2117 Error::new(ErrorKind::from(AmbiguousRename {
2118 depender: state
2119 .resolve_full_name(entry.name(), entry.conn_id())
2120 .to_string(),
2121 dependee: format!("{database_name}.{cur_name}"),
2122 message: format!("ambiguous reference to schema named {s}"),
2123 }))
2124 })?;
2125
2126 if !new_entry.item().is_temporary() {
2128 items_to_update.insert(*id, new_entry.into());
2129 } else {
2130 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2131 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2132 }
2133 updates.push(id);
2134
2135 Ok::<_, AdapterError>(())
2136 };
2137
2138 for (_name, item_id) in &schema.items {
2140 update_item(item_id)?;
2142
2143 for id in state.get_entry(item_id).referenced_by() {
2145 update_item(id)?;
2146 }
2147 }
2148 tx.update_items(items_to_update)?;
2151
2152 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2154 let schema_name = schema.name().schema.clone();
2155 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2156 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2157 )));
2158 };
2159
2160 let database_name = database_spec
2162 .id()
2163 .map(|id| state.get_database(&id).name.clone());
2164 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2165 id: schema_id.to_string(),
2166 old_name: schema.name().schema.clone(),
2167 new_name: new_name.clone(),
2168 database_name,
2169 });
2170 CatalogState::add_to_audit_log(
2171 &state.system_configuration,
2172 oracle_write_ts,
2173 session,
2174 tx,
2175 audit_events,
2176 EventType::Alter,
2177 mz_audit_log::ObjectType::Schema,
2178 details,
2179 )?;
2180
2181 let mut new_schema = schema.clone();
2183 new_schema.name.schema.clone_from(&new_name);
2184 tx.update_schema(schema_id, new_schema.into())?;
2185
2186 for id in updates {
2187 Self::log_update(state, id);
2188 }
2189 }
2190 Op::UpdateOwner { id, new_owner } => {
2191 let conn_id = session
2192 .map(|session| session.conn_id())
2193 .unwrap_or(&SYSTEM_CONN_ID);
2194 let old_owner = state
2195 .get_owner_id(&id, conn_id)
2196 .expect("cannot update the owner of an object without an owner");
2197 match &id {
2198 ObjectId::Cluster(id) => {
2199 let mut cluster = state.get_cluster(*id).clone();
2200 if id.is_system() {
2201 return Err(AdapterError::Catalog(Error::new(
2202 ErrorKind::ReadOnlyCluster(cluster.name),
2203 )));
2204 }
2205 Self::update_privilege_owners(
2206 &mut cluster.privileges,
2207 cluster.owner_id,
2208 new_owner,
2209 );
2210 cluster.owner_id = new_owner;
2211 tx.update_cluster(*id, cluster.into())?;
2212 }
2213 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2214 let cluster = state.get_cluster(*cluster_id);
2215 let mut replica = cluster
2216 .replica(*replica_id)
2217 .expect("catalog out of sync")
2218 .clone();
2219 if replica_id.is_system() {
2220 return Err(AdapterError::Catalog(Error::new(
2221 ErrorKind::ReadOnlyClusterReplica(replica.name),
2222 )));
2223 }
2224 replica.owner_id = new_owner;
2225 tx.update_cluster_replica(*replica_id, replica.into())?;
2226 }
2227 ObjectId::Database(id) => {
2228 let mut database = state.get_database(id).clone();
2229 if id.is_system() {
2230 return Err(AdapterError::Catalog(Error::new(
2231 ErrorKind::ReadOnlyDatabase(database.name),
2232 )));
2233 }
2234 Self::update_privilege_owners(
2235 &mut database.privileges,
2236 database.owner_id,
2237 new_owner,
2238 );
2239 database.owner_id = new_owner;
2240 tx.update_database(*id, database.clone().into())?;
2241 }
2242 ObjectId::Schema((database_spec, schema_spec)) => {
2243 let schema_id: SchemaId = schema_spec.clone().into();
2244 let mut schema = state
2245 .get_schema(database_spec, schema_spec, conn_id)
2246 .clone();
2247 if schema_id.is_system() {
2248 let name = schema.name();
2249 let full_name = state.resolve_full_schema_name(name);
2250 return Err(AdapterError::Catalog(Error::new(
2251 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2252 )));
2253 }
2254 Self::update_privilege_owners(
2255 &mut schema.privileges,
2256 schema.owner_id,
2257 new_owner,
2258 );
2259 schema.owner_id = new_owner;
2260 tx.update_schema(schema_id, schema.into())?;
2261 }
2262 ObjectId::Item(id) => {
2263 let entry = state.get_entry(id);
2264 let mut new_entry = entry.clone();
2265 if id.is_system() {
2266 let full_name = state.resolve_full_name(
2267 new_entry.name(),
2268 session.map(|session| session.conn_id()),
2269 );
2270 return Err(AdapterError::Catalog(Error::new(
2271 ErrorKind::ReadOnlyItem(full_name.to_string()),
2272 )));
2273 }
2274 Self::update_privilege_owners(
2275 &mut new_entry.privileges,
2276 new_entry.owner_id,
2277 new_owner,
2278 );
2279 new_entry.owner_id = new_owner;
2280 if !new_entry.item().is_temporary() {
2281 tx.update_item(*id, new_entry.into())?;
2282 } else {
2283 temporary_item_updates
2284 .push((entry.clone().into(), StateDiff::Retraction));
2285 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2286 }
2287 }
2288 ObjectId::NetworkPolicy(id) => {
2289 let mut policy = state.get_network_policy(id).clone();
2290 if id.is_system() {
2291 return Err(AdapterError::Catalog(Error::new(
2292 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2293 )));
2294 }
2295 Self::update_privilege_owners(
2296 &mut policy.privileges,
2297 policy.owner_id,
2298 new_owner,
2299 );
2300 policy.owner_id = new_owner;
2301 tx.update_network_policy(*id, policy.into())?;
2302 }
2303 ObjectId::Role(_) => unreachable!("roles have no owner"),
2304 }
2305 let object_type = state.get_object_type(&id);
2306 CatalogState::add_to_audit_log(
2307 &state.system_configuration,
2308 oracle_write_ts,
2309 session,
2310 tx,
2311 audit_events,
2312 EventType::Alter,
2313 object_type_to_audit_object_type(object_type),
2314 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2315 object_id: id.to_string(),
2316 old_owner_id: old_owner.to_string(),
2317 new_owner_id: new_owner.to_string(),
2318 }),
2319 )?;
2320 }
2321 Op::UpdateClusterConfig { id, name, config } => {
2322 let mut cluster = state.get_cluster(id).clone();
2323 cluster.config = config;
2324 tx.update_cluster(id, cluster.into())?;
2325 info!("update cluster {}", name);
2326
2327 CatalogState::add_to_audit_log(
2328 &state.system_configuration,
2329 oracle_write_ts,
2330 session,
2331 tx,
2332 audit_events,
2333 EventType::Alter,
2334 ObjectType::Cluster,
2335 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2336 id: id.to_string(),
2337 name,
2338 }),
2339 )?;
2340 }
2341 Op::UpdateClusterReplicaConfig {
2342 replica_id,
2343 cluster_id,
2344 config,
2345 } => {
2346 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2347 info!("update replica {}", replica.name);
2348 tx.update_cluster_replica(
2349 replica_id,
2350 mz_catalog::durable::ClusterReplica {
2351 cluster_id,
2352 replica_id,
2353 name: replica.name.clone(),
2354 config: config.clone().into(),
2355 owner_id: replica.owner_id,
2356 },
2357 )?;
2358 }
2359 Op::UpdateItem { id, name, to_item } => {
2360 let mut entry = state.get_entry(&id).clone();
2361 entry.name = name.clone();
2362 entry.item = to_item.clone();
2363 tx.update_item(id, entry.into())?;
2364
2365 if Self::should_audit_log_item(&to_item) {
2366 let mut full_name = Self::full_name_detail(
2367 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2368 );
2369 full_name.item = name.item;
2370
2371 CatalogState::add_to_audit_log(
2372 &state.system_configuration,
2373 oracle_write_ts,
2374 session,
2375 tx,
2376 audit_events,
2377 EventType::Alter,
2378 catalog_type_to_audit_object_type(to_item.typ()),
2379 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2380 id: id.to_string(),
2381 name: full_name,
2382 }),
2383 )?;
2384 }
2385
2386 Self::log_update(state, &id);
2387 }
2388 Op::UpdateSystemConfiguration { name, value } => {
2389 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2390 tx.upsert_system_config(&name, parsed_value.clone())?;
2391 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2396 let with_0dt_deployment_max_wait =
2397 Duration::parse(VarInput::Flat(&parsed_value))
2398 .expect("parsing succeeded above");
2399 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2400 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2401 let with_0dt_deployment_ddl_check_interval =
2402 Duration::parse(VarInput::Flat(&parsed_value))
2403 .expect("parsing succeeded above");
2404 tx.set_0dt_deployment_ddl_check_interval(
2405 with_0dt_deployment_ddl_check_interval,
2406 )?;
2407 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2408 let panic_after_timeout =
2409 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2410 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2411 }
2412
2413 CatalogState::add_to_audit_log(
2414 &state.system_configuration,
2415 oracle_write_ts,
2416 session,
2417 tx,
2418 audit_events,
2419 EventType::Alter,
2420 ObjectType::System,
2421 EventDetails::SetV1(mz_audit_log::SetV1 {
2422 name,
2423 value: Some(value.borrow().to_vec().join(", ")),
2424 }),
2425 )?;
2426 }
2427 Op::ResetSystemConfiguration { name } => {
2428 tx.remove_system_config(&name);
2429 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2434 tx.reset_0dt_deployment_max_wait()?;
2435 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2436 tx.reset_0dt_deployment_ddl_check_interval()?;
2437 }
2438
2439 CatalogState::add_to_audit_log(
2440 &state.system_configuration,
2441 oracle_write_ts,
2442 session,
2443 tx,
2444 audit_events,
2445 EventType::Alter,
2446 ObjectType::System,
2447 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2448 )?;
2449 }
2450 Op::ResetAllSystemConfiguration => {
2451 tx.clear_system_configs();
2452 tx.reset_0dt_deployment_max_wait()?;
2453 tx.reset_0dt_deployment_ddl_check_interval()?;
2454
2455 CatalogState::add_to_audit_log(
2456 &state.system_configuration,
2457 oracle_write_ts,
2458 session,
2459 tx,
2460 audit_events,
2461 EventType::Alter,
2462 ObjectType::System,
2463 EventDetails::ResetAllV1,
2464 )?;
2465 }
2466 Op::WeirdStorageUsageUpdates {
2467 object_id,
2468 size_bytes,
2469 collection_timestamp,
2470 } => {
2471 let id = tx.allocate_storage_usage_ids()?;
2472 let metric =
2473 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2474 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2475 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2476 weird_builtin_table_update = Some(builtin_table_update);
2477 }
2478 };
2479 Ok((weird_builtin_table_update, temporary_item_updates))
2480 }
2481
2482 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2483 let entry = state.get_entry(id);
2484 info!(
2485 "update {} {} ({})",
2486 entry.item_type(),
2487 state.resolve_full_name(entry.name(), entry.conn_id()),
2488 id
2489 );
2490 }
2491
2492 fn update_privilege_owners(
2496 privileges: &mut PrivilegeMap,
2497 old_owner: RoleId,
2498 new_owner: RoleId,
2499 ) {
2500 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2502
2503 let mut new_present = false;
2504 for privilege in flat_privileges.iter_mut() {
2505 if privilege.grantor == old_owner {
2508 privilege.grantor = new_owner;
2509 } else if privilege.grantor == new_owner {
2510 new_present = true;
2511 }
2512 if privilege.grantee == old_owner {
2514 privilege.grantee = new_owner;
2515 } else if privilege.grantee == new_owner {
2516 new_present = true;
2517 }
2518 }
2519
2520 if new_present {
2524 let privilege_map: BTreeMap<_, Vec<_>> =
2526 flat_privileges
2527 .into_iter()
2528 .fold(BTreeMap::new(), |mut accum, privilege| {
2529 accum
2530 .entry((privilege.grantee, privilege.grantor))
2531 .or_default()
2532 .push(privilege);
2533 accum
2534 });
2535
2536 flat_privileges = privilege_map
2538 .into_iter()
2539 .map(|((grantee, grantor), values)|
2540 values.into_iter().fold(
2542 MzAclItem::empty(grantee, grantor),
2543 |mut accum, mz_aclitem| {
2544 accum.acl_mode =
2545 accum.acl_mode.union(mz_aclitem.acl_mode);
2546 accum
2547 },
2548 ))
2549 .collect();
2550 }
2551
2552 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2553 }
2554}
2555
2556#[derive(Debug, Default)]
2564pub(crate) struct ObjectsToDrop {
2565 pub comments: BTreeSet<CommentObjectId>,
2566 pub databases: BTreeSet<DatabaseId>,
2567 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2568 pub clusters: BTreeSet<ClusterId>,
2569 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2570 pub roles: BTreeSet<RoleId>,
2571 pub items: Vec<CatalogItemId>,
2572 pub network_policies: BTreeSet<NetworkPolicyId>,
2573}
2574
2575impl ObjectsToDrop {
2576 pub fn generate(
2577 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2578 state: &CatalogState,
2579 session: Option<&ConnMeta>,
2580 ) -> Result<Self, AdapterError> {
2581 let mut delta = ObjectsToDrop::default();
2582
2583 for drop_object_info in drop_object_infos {
2584 delta.add_item(drop_object_info, state, session)?;
2585 }
2586
2587 Ok(delta)
2588 }
2589
2590 fn add_item(
2591 &mut self,
2592 drop_object_info: DropObjectInfo,
2593 state: &CatalogState,
2594 session: Option<&ConnMeta>,
2595 ) -> Result<(), AdapterError> {
2596 self.comments
2597 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2598
2599 match drop_object_info {
2600 DropObjectInfo::Database(database_id) => {
2601 let database = &state.database_by_id[&database_id];
2602 if database_id.is_system() {
2603 return Err(AdapterError::Catalog(Error::new(
2604 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2605 )));
2606 }
2607
2608 self.databases.insert(database_id);
2609 }
2610 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2611 let schema = state.get_schema(
2612 &database_spec,
2613 &schema_spec,
2614 session
2615 .map(|session| session.conn_id())
2616 .unwrap_or(&SYSTEM_CONN_ID),
2617 );
2618 let schema_id: SchemaId = schema_spec.into();
2619 if schema_id.is_system() {
2620 let name = schema.name();
2621 let full_name = state.resolve_full_schema_name(name);
2622 return Err(AdapterError::Catalog(Error::new(
2623 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2624 )));
2625 }
2626
2627 self.schemas.insert(schema_spec, database_spec);
2628 }
2629 DropObjectInfo::Role(role_id) => {
2630 let name = state.get_role(&role_id).name().to_string();
2631 if role_id.is_system() || role_id.is_predefined() {
2632 return Err(AdapterError::Catalog(Error::new(
2633 ErrorKind::ReservedRoleName(name.clone()),
2634 )));
2635 }
2636 state.ensure_not_reserved_role(&role_id)?;
2637
2638 self.roles.insert(role_id);
2639 }
2640 DropObjectInfo::Cluster(cluster_id) => {
2641 let cluster = state.get_cluster(cluster_id);
2642 let name = &cluster.name;
2643 if cluster_id.is_system() {
2644 return Err(AdapterError::Catalog(Error::new(
2645 ErrorKind::ReadOnlyCluster(name.clone()),
2646 )));
2647 }
2648
2649 self.clusters.insert(cluster_id);
2650 }
2651 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2652 let cluster = state.get_cluster(cluster_id);
2653 let replica = cluster.replica(replica_id).expect("Must exist");
2654
2655 self.replicas
2656 .insert(replica.replica_id, (cluster.id, reason));
2657 }
2658 DropObjectInfo::Item(item_id) => {
2659 let entry = state.get_entry(&item_id);
2660 if item_id.is_system() {
2661 let name = entry.name();
2662 let full_name =
2663 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2664 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2665 full_name.to_string(),
2666 ))));
2667 }
2668
2669 self.items.push(item_id);
2670 }
2671 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2672 let policy = state.get_network_policy(&network_policy_id);
2673 let name = &policy.name;
2674 if network_policy_id.is_system() {
2675 return Err(AdapterError::Catalog(Error::new(
2676 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2677 )));
2678 }
2679
2680 self.network_policies.insert(network_policy_id);
2681 }
2682 }
2683
2684 Ok(())
2685 }
2686}
2687
2688#[cfg(test)]
2689mod tests {
2690 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2691 use mz_repr::role_id::RoleId;
2692
2693 use crate::catalog::Catalog;
2694
2695 #[mz_ore::test]
2696 fn test_update_privilege_owners() {
2697 let old_owner = RoleId::User(1);
2698 let new_owner = RoleId::User(2);
2699 let other_role = RoleId::User(3);
2700
2701 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2703 MzAclItem {
2704 grantee: other_role,
2705 grantor: old_owner,
2706 acl_mode: AclMode::UPDATE,
2707 },
2708 MzAclItem {
2709 grantee: other_role,
2710 grantor: new_owner,
2711 acl_mode: AclMode::SELECT,
2712 },
2713 ]);
2714 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2715 assert_eq!(1, privileges.all_values().count());
2716 assert_eq!(
2717 vec![MzAclItem {
2718 grantee: other_role,
2719 grantor: new_owner,
2720 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2721 }],
2722 privileges.all_values_owned().collect::<Vec<_>>()
2723 );
2724
2725 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2727 MzAclItem {
2728 grantee: old_owner,
2729 grantor: other_role,
2730 acl_mode: AclMode::UPDATE,
2731 },
2732 MzAclItem {
2733 grantee: new_owner,
2734 grantor: other_role,
2735 acl_mode: AclMode::SELECT,
2736 },
2737 ]);
2738 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2739 assert_eq!(1, privileges.all_values().count());
2740 assert_eq!(
2741 vec![MzAclItem {
2742 grantee: new_owner,
2743 grantor: other_role,
2744 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2745 }],
2746 privileges.all_values_owned().collect::<Vec<_>>()
2747 );
2748
2749 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2751 MzAclItem {
2752 grantee: old_owner,
2753 grantor: old_owner,
2754 acl_mode: AclMode::UPDATE,
2755 },
2756 MzAclItem {
2757 grantee: new_owner,
2758 grantor: new_owner,
2759 acl_mode: AclMode::SELECT,
2760 },
2761 ]);
2762 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2763 assert_eq!(1, privileges.all_values().count());
2764 assert_eq!(
2765 vec![MzAclItem {
2766 grantee: new_owner,
2767 grantor: new_owner,
2768 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2769 }],
2770 privileges.all_values_owned().collect::<Vec<_>>()
2771 );
2772 }
2773}