1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22 WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26 ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34 StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48 CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49 CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50 RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53 CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68 BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
69 catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
70 is_reserved_role_name, object_type_to_audit_object_type,
71 system_object_type_to_audit_object_type,
72};
73use crate::coord::ConnMeta;
74use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
75use crate::coord::cluster_scheduling::SchedulingDecision;
76use crate::util::ResultExt;
77
78#[derive(Debug, Clone)]
79pub enum Op {
80 AlterRetainHistory {
81 id: CatalogItemId,
82 value: Option<Value>,
83 window: CompactionWindow,
84 },
85 AlterRole {
86 id: RoleId,
87 name: String,
88 attributes: RoleAttributesRaw,
89 nopassword: bool,
90 vars: RoleVars,
91 },
92 AlterNetworkPolicy {
93 id: NetworkPolicyId,
94 rules: Vec<NetworkPolicyRule>,
95 name: String,
96 owner_id: RoleId,
97 },
98 AlterAddColumn {
99 id: CatalogItemId,
100 new_global_id: GlobalId,
101 name: ColumnName,
102 typ: SqlColumnType,
103 sql: RawDataType,
104 },
105 AlterMaterializedViewApplyReplacement {
106 id: CatalogItemId,
107 replacement_id: CatalogItemId,
108 },
109 CreateDatabase {
110 name: String,
111 owner_id: RoleId,
112 },
113 CreateSchema {
114 database_id: ResolvedDatabaseSpecifier,
115 schema_name: String,
116 owner_id: RoleId,
117 },
118 CreateRole {
119 name: String,
120 attributes: RoleAttributesRaw,
121 },
122 CreateCluster {
123 id: ClusterId,
124 name: String,
125 introspection_sources: Vec<&'static BuiltinLog>,
126 owner_id: RoleId,
127 config: ClusterConfig,
128 },
129 CreateClusterReplica {
130 cluster_id: ClusterId,
131 name: String,
132 config: ReplicaConfig,
133 owner_id: RoleId,
134 reason: ReplicaCreateDropReason,
135 },
136 CreateItem {
137 id: CatalogItemId,
138 name: QualifiedItemName,
139 item: CatalogItem,
140 owner_id: RoleId,
141 },
142 CreateNetworkPolicy {
143 rules: Vec<NetworkPolicyRule>,
144 name: String,
145 owner_id: RoleId,
146 },
147 Comment {
148 object_id: CommentObjectId,
149 sub_component: Option<usize>,
150 comment: Option<String>,
151 },
152 DropObjects(Vec<DropObjectInfo>),
153 GrantRole {
154 role_id: RoleId,
155 member_id: RoleId,
156 grantor_id: RoleId,
157 },
158 RenameCluster {
159 id: ClusterId,
160 name: String,
161 to_name: String,
162 check_reserved_names: bool,
163 },
164 RenameClusterReplica {
165 cluster_id: ClusterId,
166 replica_id: ReplicaId,
167 name: QualifiedReplica,
168 to_name: String,
169 },
170 RenameItem {
171 id: CatalogItemId,
172 current_full_name: FullItemName,
173 to_name: String,
174 },
175 RenameSchema {
176 database_spec: ResolvedDatabaseSpecifier,
177 schema_spec: SchemaSpecifier,
178 new_name: String,
179 check_reserved_names: bool,
180 },
181 UpdateOwner {
182 id: ObjectId,
183 new_owner: RoleId,
184 },
185 UpdatePrivilege {
186 target_id: SystemObjectId,
187 privilege: MzAclItem,
188 variant: UpdatePrivilegeVariant,
189 },
190 UpdateDefaultPrivilege {
191 privilege_object: DefaultPrivilegeObject,
192 privilege_acl_item: DefaultPrivilegeAclItem,
193 variant: UpdatePrivilegeVariant,
194 },
195 RevokeRole {
196 role_id: RoleId,
197 member_id: RoleId,
198 grantor_id: RoleId,
199 },
200 UpdateClusterConfig {
201 id: ClusterId,
202 name: String,
203 config: ClusterConfig,
204 },
205 UpdateClusterReplicaConfig {
206 cluster_id: ClusterId,
207 replica_id: ReplicaId,
208 config: ReplicaConfig,
209 },
210 UpdateItem {
211 id: CatalogItemId,
212 name: QualifiedItemName,
213 to_item: CatalogItem,
214 },
215 UpdateSourceReferences {
216 source_id: CatalogItemId,
217 references: SourceReferences,
218 },
219 UpdateSystemConfiguration {
220 name: String,
221 value: OwnedVarInput,
222 },
223 ResetSystemConfiguration {
224 name: String,
225 },
226 ResetAllSystemConfiguration,
227 WeirdStorageUsageUpdates {
234 object_id: Option<String>,
235 size_bytes: u64,
236 collection_timestamp: EpochMillis,
237 },
238 TransactionDryRun,
244}
245
246#[derive(Debug, Clone)]
250pub enum DropObjectInfo {
251 Cluster(ClusterId),
252 ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
253 Database(DatabaseId),
254 Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
255 Role(RoleId),
256 Item(CatalogItemId),
257 NetworkPolicy(NetworkPolicyId),
258}
259
260impl DropObjectInfo {
261 pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
264 match id {
265 ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
266 ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
267 cluster_id,
268 replica_id,
269 ReplicaCreateDropReason::Manual,
270 )),
271 ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
272 ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
273 ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
274 ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
275 ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
276 }
277 }
278
279 fn to_object_id(&self) -> ObjectId {
282 match &self {
283 DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
284 DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
285 ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
286 }
287 DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
288 DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
289 DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
290 DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
291 DropObjectInfo::NetworkPolicy(network_policy_id) => {
292 ObjectId::NetworkPolicy(network_policy_id.clone())
293 }
294 }
295 }
296}
297
298#[derive(Debug, Clone)]
300pub enum ReplicaCreateDropReason {
301 Manual,
306 ClusterScheduling(Vec<SchedulingDecision>),
309}
310
311impl ReplicaCreateDropReason {
312 pub fn into_audit_log(
313 self,
314 ) -> (
315 CreateOrDropClusterReplicaReasonV1,
316 Option<SchedulingDecisionsWithReasonsV2>,
317 ) {
318 let (reason, scheduling_policies) = match self {
319 ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
320 ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
321 CreateOrDropClusterReplicaReasonV1::Schedule,
322 Some(scheduling_decisions),
323 ),
324 };
325 (
326 reason,
327 scheduling_policies
328 .as_ref()
329 .map(SchedulingDecision::reasons_to_audit_log_reasons),
330 )
331 }
332}
333
334pub struct TransactionResult {
335 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
336 pub catalog_updates: Vec<ParsedStateUpdate>,
338 pub audit_events: Vec<VersionedEvent>,
339}
340
341impl Catalog {
342 fn should_audit_log_item(item: &CatalogItem) -> bool {
343 !item.is_temporary()
344 }
345
346 fn temporary_ids(
349 &self,
350 ops: &[Op],
351 temporary_drops: BTreeSet<(&ConnectionId, String)>,
352 ) -> Result<BTreeSet<CatalogItemId>, Error> {
353 let mut creating = BTreeSet::new();
354 let mut temporary_ids = BTreeSet::new();
355 for op in ops.iter() {
356 if let Op::CreateItem {
357 id,
358 name,
359 item,
360 owner_id: _,
361 } = op
362 {
363 if let Some(conn_id) = item.conn_id() {
364 if self.item_exists_in_temp_schemas(conn_id, &name.item)
365 && !temporary_drops.contains(&(conn_id, name.item.clone()))
366 || creating.contains(&(conn_id, &name.item))
367 {
368 return Err(
369 SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
370 );
371 } else {
372 creating.insert((conn_id, &name.item));
373 temporary_ids.insert(id.clone());
374 }
375 }
376 }
377 }
378 Ok(temporary_ids)
379 }
380
381 #[instrument(name = "catalog::transact")]
382 pub async fn transact(
383 &mut self,
384 storage_collections: Option<
387 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
388 >,
389 oracle_write_ts: mz_repr::Timestamp,
390 session: Option<&ConnMeta>,
391 ops: Vec<Op>,
392 ) -> Result<TransactionResult, AdapterError> {
393 trace!("transact: {:?}", ops);
394 fail::fail_point!("catalog_transact", |arg| {
395 Err(AdapterError::Unstructured(anyhow::anyhow!(
396 "failpoint: {arg:?}"
397 )))
398 });
399
400 let drop_ids: BTreeSet<CatalogItemId> = ops
401 .iter()
402 .filter_map(|op| match op {
403 Op::DropObjects(drop_object_infos) => {
404 let ids = drop_object_infos.iter().map(|info| info.to_object_id());
405 let item_ids = ids.filter_map(|id| match id {
406 ObjectId::Item(id) => Some(id),
407 _ => None,
408 });
409 Some(item_ids)
410 }
411 _ => None,
412 })
413 .flatten()
414 .collect();
415 let temporary_drops = drop_ids
416 .iter()
417 .filter_map(|id| {
418 let entry = self.get_entry(id);
419 match entry.item().conn_id() {
420 Some(conn_id) => Some((conn_id, entry.name().item.clone())),
421 None => None,
422 }
423 })
424 .collect();
425 let dropped_global_ids = drop_ids
426 .iter()
427 .flat_map(|item_id| self.get_global_ids(item_id))
428 .collect();
429
430 let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
431 let mut builtin_table_updates = vec![];
432 let mut catalog_updates = vec![];
433 let mut audit_events = vec![];
434 let mut storage = self.storage().await;
435 let mut tx = storage
436 .transaction()
437 .await
438 .unwrap_or_terminate("starting catalog transaction");
439
440 let new_state = Self::transact_inner(
441 storage_collections,
442 oracle_write_ts,
443 session,
444 ops,
445 temporary_ids,
446 &mut builtin_table_updates,
447 &mut catalog_updates,
448 &mut audit_events,
449 &mut tx,
450 &self.state,
451 )
452 .await?;
453
454 tx.commit(oracle_write_ts)
459 .await
460 .unwrap_or_terminate("catalog storage transaction commit must succeed");
461
462 drop(storage);
465 if let Some(new_state) = new_state {
466 self.transient_revision += 1;
467 self.state = new_state;
468 }
469
470 let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
472 if self.state.system_config().enable_mz_notices() {
473 self.state().pack_optimizer_notices(
475 &mut builtin_table_updates,
476 dropped_notices.iter(),
477 Diff::MINUS_ONE,
478 );
479 }
480
481 Ok(TransactionResult {
482 builtin_table_updates,
483 catalog_updates,
484 audit_events,
485 })
486 }
487
488 #[instrument(name = "catalog::transact_inner")]
496 async fn transact_inner(
497 storage_collections: Option<
498 &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
499 >,
500 oracle_write_ts: mz_repr::Timestamp,
501 session: Option<&ConnMeta>,
502 mut ops: Vec<Op>,
503 temporary_ids: BTreeSet<CatalogItemId>,
504 builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
505 parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
506 audit_events: &mut Vec<VersionedEvent>,
507 tx: &mut Transaction<'_>,
508 state: &CatalogState,
509 ) -> Result<Option<CatalogState>, AdapterError> {
510 let mut preliminary_state = Cow::Borrowed(state);
537
538 let mut state = Cow::Borrowed(state);
540
541 let dry_run_ops = match ops.last() {
542 Some(Op::TransactionDryRun) => {
543 ops.pop();
545 assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
546 ops.clone()
547 }
548 Some(_) => vec![],
549 None => return Ok(None),
550 };
551
552 let mut storage_collections_to_create = BTreeSet::new();
553 let mut storage_collections_to_drop = BTreeSet::new();
554 let mut storage_collections_to_register = BTreeMap::new();
555
556 let mut updates = Vec::new();
557
558 for op in ops {
559 let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
560 oracle_write_ts,
561 session,
562 op,
563 &temporary_ids,
564 audit_events,
565 tx,
566 &*preliminary_state,
567 &mut storage_collections_to_create,
568 &mut storage_collections_to_drop,
569 &mut storage_collections_to_register,
570 )
571 .await?;
572
573 if let Some(builtin_table_update) = weird_builtin_table_update {
582 builtin_table_updates.push(builtin_table_update);
583 }
584
585 let upper = tx.upper();
590 let temporary_item_updates =
591 temporary_item_updates
592 .into_iter()
593 .map(|(item, diff)| StateUpdate {
594 kind: StateUpdateKind::TemporaryItem(item),
595 ts: upper,
596 diff,
597 });
598
599 let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
600 op_updates.extend(temporary_item_updates);
601 if !op_updates.is_empty() {
602 let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
603 .to_mut()
604 .apply_updates(op_updates.clone(), &mut LocalExpressionCache::Closed)
605 .await;
606 }
607 updates.append(&mut op_updates);
608 }
609
610 if !updates.is_empty() {
611 let (op_builtin_table_updates, op_catalog_updates) = state
612 .to_mut()
613 .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
614 .await;
615 let op_builtin_table_updates = state
616 .to_mut()
617 .resolve_builtin_table_updates(op_builtin_table_updates);
618 builtin_table_updates.extend(op_builtin_table_updates);
619 parsed_catalog_updates.extend(op_catalog_updates);
620 }
621
622 if dry_run_ops.is_empty() {
623 if let Some(c) = storage_collections {
625 c.prepare_state(
626 tx,
627 storage_collections_to_create,
628 storage_collections_to_drop,
629 storage_collections_to_register,
630 )
631 .await?;
632 }
633
634 let updates = tx.get_and_commit_op_updates();
635 if !updates.is_empty() {
636 let (op_builtin_table_updates, op_catalog_updates) = state
637 .to_mut()
638 .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
639 .await;
640 let op_builtin_table_updates = state
641 .to_mut()
642 .resolve_builtin_table_updates(op_builtin_table_updates);
643 builtin_table_updates.extend(op_builtin_table_updates);
644 parsed_catalog_updates.extend(op_catalog_updates);
645 }
646
647 match state {
648 Cow::Owned(state) => Ok(Some(state)),
649 Cow::Borrowed(_) => Ok(None),
650 }
651 } else {
652 Err(AdapterError::TransactionDryRun {
653 new_ops: dry_run_ops,
654 new_state: state.into_owned(),
655 })
656 }
657 }
658
659 #[instrument]
667 async fn transact_op(
668 oracle_write_ts: mz_repr::Timestamp,
669 session: Option<&ConnMeta>,
670 op: Op,
671 temporary_ids: &BTreeSet<CatalogItemId>,
672 audit_events: &mut Vec<VersionedEvent>,
673 tx: &mut Transaction<'_>,
674 state: &CatalogState,
675 storage_collections_to_create: &mut BTreeSet<GlobalId>,
676 storage_collections_to_drop: &mut BTreeSet<GlobalId>,
677 storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
678 ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
679 let mut weird_builtin_table_update = None;
680 let mut temporary_item_updates = Vec::new();
681
682 match op {
683 Op::TransactionDryRun => {
684 unreachable!("TransactionDryRun can only be used a final element of ops")
685 }
686 Op::AlterRetainHistory { id, value, window } => {
687 let entry = state.get_entry(&id);
688 if id.is_system() {
689 let name = entry.name();
690 let full_name =
691 state.resolve_full_name(name, session.map(|session| session.conn_id()));
692 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
693 full_name.to_string(),
694 ))));
695 }
696
697 let mut new_entry = entry.clone();
698 let previous = new_entry
699 .item
700 .update_retain_history(value.clone(), window)
701 .map_err(|_| {
702 AdapterError::Catalog(Error::new(ErrorKind::Internal(
703 "planner should have rejected invalid alter retain history item type"
704 .to_string(),
705 )))
706 })?;
707
708 if Self::should_audit_log_item(new_entry.item()) {
709 let details =
710 EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
711 id: id.to_string(),
712 old_history: previous.map(|previous| previous.to_string()),
713 new_history: value.map(|v| v.to_string()),
714 });
715 CatalogState::add_to_audit_log(
716 &state.system_configuration,
717 oracle_write_ts,
718 session,
719 tx,
720 audit_events,
721 EventType::Alter,
722 catalog_type_to_audit_object_type(new_entry.item().typ()),
723 details,
724 )?;
725 }
726
727 tx.update_item(id, new_entry.into())?;
728
729 Self::log_update(state, &id);
730 }
731 Op::AlterRole {
732 id,
733 name,
734 attributes,
735 nopassword,
736 vars,
737 } => {
738 state.ensure_not_reserved_role(&id)?;
739
740 let mut existing_role = state.get_role(&id).clone();
741 let password = attributes.password.clone();
742 let scram_iterations = attributes
743 .scram_iterations
744 .unwrap_or_else(|| state.system_config().scram_iterations());
745 existing_role.attributes = attributes.into();
746 existing_role.vars = vars;
747 let password_action = if nopassword {
748 PasswordAction::Clear
749 } else if let Some(password) = password {
750 PasswordAction::Set(PasswordConfig {
751 password,
752 scram_iterations,
753 })
754 } else {
755 PasswordAction::NoChange
756 };
757 tx.update_role(id, existing_role.into(), password_action)?;
758
759 CatalogState::add_to_audit_log(
760 &state.system_configuration,
761 oracle_write_ts,
762 session,
763 tx,
764 audit_events,
765 EventType::Alter,
766 ObjectType::Role,
767 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
768 id: id.to_string(),
769 name: name.clone(),
770 }),
771 )?;
772
773 info!("update role {name} ({id})");
774 }
775 Op::AlterNetworkPolicy {
776 id,
777 rules,
778 name,
779 owner_id: _owner_id,
780 } => {
781 let existing_policy = state.get_network_policy(&id).clone();
782 let mut policy: NetworkPolicy = existing_policy.into();
783 policy.rules = rules;
784 if is_reserved_name(&name) {
785 return Err(AdapterError::Catalog(Error::new(
786 ErrorKind::ReservedNetworkPolicyName(name),
787 )));
788 }
789 tx.update_network_policy(id, policy.clone())?;
790
791 CatalogState::add_to_audit_log(
792 &state.system_configuration,
793 oracle_write_ts,
794 session,
795 tx,
796 audit_events,
797 EventType::Alter,
798 ObjectType::NetworkPolicy,
799 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
800 id: id.to_string(),
801 name: name.clone(),
802 }),
803 )?;
804
805 info!("update network policy {name} ({id})");
806 }
807 Op::AlterAddColumn {
808 id,
809 new_global_id,
810 name,
811 typ,
812 sql,
813 } => {
814 let mut new_entry = state.get_entry(&id).clone();
815 let version = new_entry.item.add_column(name, typ, sql)?;
816 let shard_id = state
819 .storage_metadata()
820 .get_collection_shard(new_entry.latest_global_id())?;
821
822 let CatalogItem::Table(table) = &mut new_entry.item else {
824 return Err(AdapterError::Unsupported("adding columns to non-Table"));
825 };
826 table.collections.insert(version, new_global_id);
827
828 tx.update_item(id, new_entry.into())?;
829 storage_collections_to_register.insert(new_global_id, shard_id);
830 }
831 Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
832 let mut new_entry = state.get_entry(&id).clone();
833 let replacement = state.get_entry(&replacement_id);
834
835 let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
836 return Err(AdapterError::internal(
837 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
838 "id must refer to a materialized view",
839 ));
840 };
841 let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
842 return Err(AdapterError::internal(
843 "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
844 "replacement_id must refer to a materialized view",
845 ));
846 };
847
848 mv.apply_replacement(replacement_mv.clone());
849
850 tx.remove_item(replacement_id)?;
851 tx.update_item(id, new_entry.into())?;
852
853 }
855 Op::CreateDatabase { name, owner_id } => {
856 let database_owner_privileges = vec![rbac::owner_privilege(
857 mz_sql::catalog::ObjectType::Database,
858 owner_id,
859 )];
860 let database_default_privileges = state
861 .default_privileges
862 .get_applicable_privileges(
863 owner_id,
864 None,
865 None,
866 mz_sql::catalog::ObjectType::Database,
867 )
868 .map(|item| item.mz_acl_item(owner_id));
869 let database_privileges: Vec<_> = merge_mz_acl_items(
870 database_owner_privileges
871 .into_iter()
872 .chain(database_default_privileges),
873 )
874 .collect();
875
876 let schema_owner_privileges = vec![rbac::owner_privilege(
877 mz_sql::catalog::ObjectType::Schema,
878 owner_id,
879 )];
880 let schema_default_privileges = state
881 .default_privileges
882 .get_applicable_privileges(
883 owner_id,
884 None,
885 None,
886 mz_sql::catalog::ObjectType::Schema,
887 )
888 .map(|item| item.mz_acl_item(owner_id))
889 .chain(std::iter::once(MzAclItem {
891 grantee: RoleId::Public,
892 grantor: owner_id,
893 acl_mode: AclMode::USAGE,
894 }));
895 let schema_privileges: Vec<_> = merge_mz_acl_items(
896 schema_owner_privileges
897 .into_iter()
898 .chain(schema_default_privileges),
899 )
900 .collect();
901
902 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
903 let (database_id, _) = tx.insert_user_database(
904 &name,
905 owner_id,
906 database_privileges.clone(),
907 &temporary_oids,
908 )?;
909 let (schema_id, _) = tx.insert_user_schema(
910 database_id,
911 DEFAULT_SCHEMA,
912 owner_id,
913 schema_privileges.clone(),
914 &temporary_oids,
915 )?;
916 CatalogState::add_to_audit_log(
917 &state.system_configuration,
918 oracle_write_ts,
919 session,
920 tx,
921 audit_events,
922 EventType::Create,
923 ObjectType::Database,
924 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
925 id: database_id.to_string(),
926 name: name.clone(),
927 }),
928 )?;
929 info!("create database {}", name);
930
931 CatalogState::add_to_audit_log(
932 &state.system_configuration,
933 oracle_write_ts,
934 session,
935 tx,
936 audit_events,
937 EventType::Create,
938 ObjectType::Schema,
939 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
940 id: schema_id.to_string(),
941 name: DEFAULT_SCHEMA.to_string(),
942 database_name: Some(name),
943 }),
944 )?;
945 }
946 Op::CreateSchema {
947 database_id,
948 schema_name,
949 owner_id,
950 } => {
951 if is_reserved_name(&schema_name) {
952 return Err(AdapterError::Catalog(Error::new(
953 ErrorKind::ReservedSchemaName(schema_name),
954 )));
955 }
956 let database_id = match database_id {
957 ResolvedDatabaseSpecifier::Id(id) => id,
958 ResolvedDatabaseSpecifier::Ambient => {
959 return Err(AdapterError::Catalog(Error::new(
960 ErrorKind::ReadOnlySystemSchema(schema_name),
961 )));
962 }
963 };
964 let owner_privileges = vec![rbac::owner_privilege(
965 mz_sql::catalog::ObjectType::Schema,
966 owner_id,
967 )];
968 let default_privileges = state
969 .default_privileges
970 .get_applicable_privileges(
971 owner_id,
972 Some(database_id),
973 None,
974 mz_sql::catalog::ObjectType::Schema,
975 )
976 .map(|item| item.mz_acl_item(owner_id));
977 let privileges: Vec<_> =
978 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
979 .collect();
980 let (schema_id, _) = tx.insert_user_schema(
981 database_id,
982 &schema_name,
983 owner_id,
984 privileges.clone(),
985 &state.get_temporary_oids().collect(),
986 )?;
987 CatalogState::add_to_audit_log(
988 &state.system_configuration,
989 oracle_write_ts,
990 session,
991 tx,
992 audit_events,
993 EventType::Create,
994 ObjectType::Schema,
995 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
996 id: schema_id.to_string(),
997 name: schema_name.clone(),
998 database_name: Some(state.database_by_id[&database_id].name.clone()),
999 }),
1000 )?;
1001 }
1002 Op::CreateRole { name, attributes } => {
1003 if is_reserved_role_name(&name) {
1004 return Err(AdapterError::Catalog(Error::new(
1005 ErrorKind::ReservedRoleName(name),
1006 )));
1007 }
1008 let membership = RoleMembership::new();
1009 let vars = RoleVars::default();
1010 let (id, _) = tx.insert_user_role(
1011 name.clone(),
1012 attributes.clone(),
1013 membership.clone(),
1014 vars.clone(),
1015 &state.get_temporary_oids().collect(),
1016 )?;
1017 CatalogState::add_to_audit_log(
1018 &state.system_configuration,
1019 oracle_write_ts,
1020 session,
1021 tx,
1022 audit_events,
1023 EventType::Create,
1024 ObjectType::Role,
1025 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1026 id: id.to_string(),
1027 name: name.clone(),
1028 }),
1029 )?;
1030 info!("create role {}", name);
1031 }
1032 Op::CreateCluster {
1033 id,
1034 name,
1035 introspection_sources,
1036 owner_id,
1037 config,
1038 } => {
1039 if is_reserved_name(&name) {
1040 return Err(AdapterError::Catalog(Error::new(
1041 ErrorKind::ReservedClusterName(name),
1042 )));
1043 }
1044 let owner_privileges = vec![rbac::owner_privilege(
1045 mz_sql::catalog::ObjectType::Cluster,
1046 owner_id,
1047 )];
1048 let default_privileges = state
1049 .default_privileges
1050 .get_applicable_privileges(
1051 owner_id,
1052 None,
1053 None,
1054 mz_sql::catalog::ObjectType::Cluster,
1055 )
1056 .map(|item| item.mz_acl_item(owner_id));
1057 let privileges: Vec<_> =
1058 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1059 .collect();
1060 let introspection_source_ids: Vec<_> = introspection_sources
1061 .iter()
1062 .map(|introspection_source| {
1063 Transaction::allocate_introspection_source_index_id(
1064 &id,
1065 introspection_source.variant,
1066 )
1067 })
1068 .collect();
1069
1070 let introspection_sources = introspection_sources
1071 .into_iter()
1072 .zip_eq(introspection_source_ids)
1073 .map(|(log, (item_id, gid))| (log, item_id, gid))
1074 .collect();
1075
1076 tx.insert_user_cluster(
1077 id,
1078 &name,
1079 introspection_sources,
1080 owner_id,
1081 privileges.clone(),
1082 config.clone().into(),
1083 &state.get_temporary_oids().collect(),
1084 )?;
1085 CatalogState::add_to_audit_log(
1086 &state.system_configuration,
1087 oracle_write_ts,
1088 session,
1089 tx,
1090 audit_events,
1091 EventType::Create,
1092 ObjectType::Cluster,
1093 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1094 id: id.to_string(),
1095 name: name.clone(),
1096 }),
1097 )?;
1098 info!("create cluster {}", name);
1099 }
1100 Op::CreateClusterReplica {
1101 cluster_id,
1102 name,
1103 config,
1104 owner_id,
1105 reason,
1106 } => {
1107 if is_reserved_name(&name) {
1108 return Err(AdapterError::Catalog(Error::new(
1109 ErrorKind::ReservedReplicaName(name),
1110 )));
1111 }
1112 let cluster = state.get_cluster(cluster_id);
1113 let id =
1114 tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1115 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1116 size,
1117 billed_as,
1118 internal,
1119 ..
1120 }) = &config.location
1121 {
1122 let (reason, scheduling_policies) = reason.into_audit_log();
1123 let details = EventDetails::CreateClusterReplicaV4(
1124 mz_audit_log::CreateClusterReplicaV4 {
1125 cluster_id: cluster_id.to_string(),
1126 cluster_name: cluster.name.clone(),
1127 replica_id: Some(id.to_string()),
1128 replica_name: name.clone(),
1129 logical_size: size.clone(),
1130 billed_as: billed_as.clone(),
1131 internal: *internal,
1132 reason,
1133 scheduling_policies,
1134 },
1135 );
1136 CatalogState::add_to_audit_log(
1137 &state.system_configuration,
1138 oracle_write_ts,
1139 session,
1140 tx,
1141 audit_events,
1142 EventType::Create,
1143 ObjectType::ClusterReplica,
1144 details,
1145 )?;
1146 }
1147 }
1148 Op::CreateItem {
1149 id,
1150 name,
1151 item,
1152 owner_id,
1153 } => {
1154 state.check_unstable_dependencies(&item)?;
1155
1156 match &item {
1157 CatalogItem::Table(table) => {
1158 let gids: Vec<_> = table.global_ids().collect();
1159 assert_eq!(gids.len(), 1);
1160 storage_collections_to_create.extend(gids);
1161 }
1162 CatalogItem::Source(source) => {
1163 storage_collections_to_create.insert(source.global_id());
1164 }
1165 CatalogItem::MaterializedView(mv) => {
1166 let mv_gid = mv.global_id_writes();
1167 if let Some(target_id) = mv.replacement_target {
1168 let target_gid = state.get_entry(&target_id).latest_global_id();
1169 let shard_id =
1170 state.storage_metadata().get_collection_shard(target_gid)?;
1171 storage_collections_to_register.insert(mv_gid, shard_id);
1172 } else {
1173 storage_collections_to_create.insert(mv_gid);
1174 }
1175 }
1176 CatalogItem::ContinualTask(ct) => {
1177 storage_collections_to_create.insert(ct.global_id());
1178 }
1179 CatalogItem::Sink(sink) => {
1180 storage_collections_to_create.insert(sink.global_id());
1181 }
1182 CatalogItem::Log(_)
1183 | CatalogItem::View(_)
1184 | CatalogItem::Index(_)
1185 | CatalogItem::Type(_)
1186 | CatalogItem::Func(_)
1187 | CatalogItem::Secret(_)
1188 | CatalogItem::Connection(_) => (),
1189 }
1190
1191 let system_user = session.map_or(false, |s| s.user().is_system_user());
1192 if !system_user {
1193 if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1194 let cluster_name = state.clusters_by_id[&id].name.clone();
1195 return Err(AdapterError::Catalog(Error::new(
1196 ErrorKind::ReadOnlyCluster(cluster_name),
1197 )));
1198 }
1199 }
1200
1201 let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1202 let default_privileges = state
1203 .default_privileges
1204 .get_applicable_privileges(
1205 owner_id,
1206 name.qualifiers.database_spec.id(),
1207 Some(name.qualifiers.schema_spec.into()),
1208 item.typ().into(),
1209 )
1210 .map(|item| item.mz_acl_item(owner_id));
1211 let progress_source_privilege = if item.is_progress_source() {
1213 Some(MzAclItem {
1214 grantee: MZ_SUPPORT_ROLE_ID,
1215 grantor: owner_id,
1216 acl_mode: AclMode::SELECT,
1217 })
1218 } else {
1219 None
1220 };
1221 let privileges: Vec<_> = merge_mz_acl_items(
1222 owner_privileges
1223 .into_iter()
1224 .chain(default_privileges)
1225 .chain(progress_source_privilege),
1226 )
1227 .collect();
1228
1229 let temporary_oids = state.get_temporary_oids().collect();
1230
1231 if item.is_temporary() {
1232 if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1233 || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1234 {
1235 return Err(AdapterError::Catalog(Error::new(
1236 ErrorKind::InvalidTemporarySchema,
1237 )));
1238 }
1239 let oid = tx.allocate_oid(&temporary_oids)?;
1240
1241 let schema_id = name.qualifiers.schema_spec.clone().into();
1242 let item_type = item.typ();
1243 let (create_sql, global_id, versions) = item.to_serialized();
1244
1245 let item = TemporaryItem {
1246 id,
1247 oid,
1248 global_id,
1249 schema_id,
1250 name: name.item.clone(),
1251 create_sql,
1252 conn_id: item.conn_id().cloned(),
1253 owner_id,
1254 privileges: privileges.clone(),
1255 extra_versions: versions,
1256 };
1257 temporary_item_updates.push((item, StateDiff::Addition));
1258
1259 info!(
1260 "create temporary {} {} ({})",
1261 item_type,
1262 state.resolve_full_name(&name, None),
1263 id
1264 );
1265 } else {
1266 if let Some(temp_id) =
1267 item.uses()
1268 .iter()
1269 .find(|id| match state.try_get_entry(*id) {
1270 Some(entry) => entry.item().is_temporary(),
1271 None => temporary_ids.contains(id),
1272 })
1273 {
1274 let temp_item = state.get_entry(temp_id);
1275 return Err(AdapterError::Catalog(Error::new(
1276 ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1277 )));
1278 }
1279 if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1280 && !system_user
1281 {
1282 let schema_name = state
1283 .resolve_full_name(&name, session.map(|session| session.conn_id()))
1284 .schema;
1285 return Err(AdapterError::Catalog(Error::new(
1286 ErrorKind::ReadOnlySystemSchema(schema_name),
1287 )));
1288 }
1289 let schema_id = name.qualifiers.schema_spec.clone().into();
1290 let item_type = item.typ();
1291 let (create_sql, global_id, versions) = item.to_serialized();
1292 tx.insert_user_item(
1293 id,
1294 global_id,
1295 schema_id,
1296 &name.item,
1297 create_sql,
1298 owner_id,
1299 privileges.clone(),
1300 &temporary_oids,
1301 versions,
1302 )?;
1303 info!(
1304 "create {} {} ({})",
1305 item_type,
1306 state.resolve_full_name(&name, None),
1307 id
1308 );
1309 }
1310
1311 if Self::should_audit_log_item(&item) {
1312 let name = Self::full_name_detail(
1313 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1314 );
1315 let details = match &item {
1316 CatalogItem::Source(s) => {
1317 let cluster_id = match s.data_source {
1318 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1321 match state.get_entry(&ingestion_id).cluster_id() {
1322 Some(cluster_id) => Some(cluster_id.to_string()),
1323 None => None,
1324 }
1325 }
1326 _ => match item.cluster_id() {
1327 Some(cluster_id) => Some(cluster_id.to_string()),
1328 None => None,
1329 },
1330 };
1331
1332 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1333 id: id.to_string(),
1334 cluster_id,
1335 name,
1336 external_type: s.source_type().to_string(),
1337 })
1338 }
1339 CatalogItem::Sink(s) => {
1340 EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1341 id: id.to_string(),
1342 cluster_id: Some(s.cluster_id.to_string()),
1343 name,
1344 external_type: s.sink_type().to_string(),
1345 })
1346 }
1347 CatalogItem::Index(i) => {
1348 EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1349 id: id.to_string(),
1350 name,
1351 cluster_id: i.cluster_id.to_string(),
1352 })
1353 }
1354 CatalogItem::MaterializedView(mv) => {
1355 EventDetails::CreateMaterializedViewV1(
1356 mz_audit_log::CreateMaterializedViewV1 {
1357 id: id.to_string(),
1358 name,
1359 cluster_id: mv.cluster_id.to_string(),
1360 },
1361 )
1362 }
1363 _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1364 id: id.to_string(),
1365 name,
1366 }),
1367 };
1368 CatalogState::add_to_audit_log(
1369 &state.system_configuration,
1370 oracle_write_ts,
1371 session,
1372 tx,
1373 audit_events,
1374 EventType::Create,
1375 catalog_type_to_audit_object_type(item.typ()),
1376 details,
1377 )?;
1378 }
1379 }
1380 Op::CreateNetworkPolicy {
1381 rules,
1382 name,
1383 owner_id,
1384 } => {
1385 if state.network_policies_by_name.contains_key(&name) {
1386 return Err(AdapterError::PlanError(PlanError::Catalog(
1387 SqlCatalogError::NetworkPolicyAlreadyExists(name),
1388 )));
1389 }
1390 if is_reserved_name(&name) {
1391 return Err(AdapterError::Catalog(Error::new(
1392 ErrorKind::ReservedNetworkPolicyName(name),
1393 )));
1394 }
1395
1396 let owner_privileges = vec![rbac::owner_privilege(
1397 mz_sql::catalog::ObjectType::NetworkPolicy,
1398 owner_id,
1399 )];
1400 let default_privileges = state
1401 .default_privileges
1402 .get_applicable_privileges(
1403 owner_id,
1404 None,
1405 None,
1406 mz_sql::catalog::ObjectType::NetworkPolicy,
1407 )
1408 .map(|item| item.mz_acl_item(owner_id));
1409 let privileges: Vec<_> =
1410 merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1411 .collect();
1412
1413 let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1414 let id = tx.insert_user_network_policy(
1415 name.clone(),
1416 rules,
1417 privileges,
1418 owner_id,
1419 &temporary_oids,
1420 )?;
1421
1422 CatalogState::add_to_audit_log(
1423 &state.system_configuration,
1424 oracle_write_ts,
1425 session,
1426 tx,
1427 audit_events,
1428 EventType::Create,
1429 ObjectType::NetworkPolicy,
1430 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1431 id: id.to_string(),
1432 name: name.clone(),
1433 }),
1434 )?;
1435
1436 info!("created network policy {name} ({id})");
1437 }
1438 Op::Comment {
1439 object_id,
1440 sub_component,
1441 comment,
1442 } => {
1443 tx.update_comment(object_id, sub_component, comment)?;
1444 let entry = state.get_comment_id_entry(&object_id);
1445 let should_log = entry
1446 .map(|entry| Self::should_audit_log_item(entry.item()))
1447 .unwrap_or(true);
1449 if let (Some(conn_id), true) =
1452 (session.map(|session| session.conn_id()), should_log)
1453 {
1454 CatalogState::add_to_audit_log(
1455 &state.system_configuration,
1456 oracle_write_ts,
1457 session,
1458 tx,
1459 audit_events,
1460 EventType::Comment,
1461 comment_id_to_audit_object_type(object_id),
1462 EventDetails::IdNameV1(IdNameV1 {
1463 id: format!("{object_id:?}"),
1465 name: state.comment_id_to_audit_log_name(object_id, conn_id),
1466 }),
1467 )?;
1468 }
1469 }
1470 Op::UpdateSourceReferences {
1471 source_id,
1472 references,
1473 } => {
1474 tx.update_source_references(
1475 source_id,
1476 references
1477 .references
1478 .into_iter()
1479 .map(|reference| reference.into())
1480 .collect(),
1481 references.updated_at,
1482 )?;
1483 }
1484 Op::DropObjects(drop_object_infos) => {
1485 let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1487
1488 tx.drop_comments(&delta.comments)?;
1490
1491 let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1493 delta
1494 .items
1495 .iter()
1496 .map(|id| id)
1497 .partition(|id| !state.get_entry(*id).item().is_temporary());
1498 tx.remove_items(&durable_items_to_drop)?;
1499 temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1500 let entry = state.get_entry(&id);
1501 (entry.clone().into(), StateDiff::Retraction)
1502 }));
1503
1504 for item_id in delta.items {
1505 let entry = state.get_entry(&item_id);
1506
1507 if entry.item().is_storage_collection() {
1508 storage_collections_to_drop.extend(entry.global_ids());
1509 }
1510
1511 if state.source_references.contains_key(&item_id) {
1512 tx.remove_source_references(item_id)?;
1513 }
1514
1515 if Self::should_audit_log_item(entry.item()) {
1516 CatalogState::add_to_audit_log(
1517 &state.system_configuration,
1518 oracle_write_ts,
1519 session,
1520 tx,
1521 audit_events,
1522 EventType::Drop,
1523 catalog_type_to_audit_object_type(entry.item().typ()),
1524 EventDetails::IdFullNameV1(IdFullNameV1 {
1525 id: item_id.to_string(),
1526 name: Self::full_name_detail(&state.resolve_full_name(
1527 entry.name(),
1528 session.map(|session| session.conn_id()),
1529 )),
1530 }),
1531 )?;
1532 }
1533 info!(
1534 "drop {} {} ({})",
1535 entry.item_type(),
1536 state.resolve_full_name(entry.name(), entry.conn_id()),
1537 item_id
1538 );
1539 }
1540
1541 let schemas = delta
1543 .schemas
1544 .iter()
1545 .map(|(schema_spec, database_spec)| {
1546 (SchemaId::from(schema_spec), *database_spec)
1547 })
1548 .collect();
1549 tx.remove_schemas(&schemas)?;
1550
1551 for (schema_spec, database_spec) in delta.schemas {
1552 let schema = state.get_schema(
1553 &database_spec,
1554 &schema_spec,
1555 session
1556 .map(|session| session.conn_id())
1557 .unwrap_or(&SYSTEM_CONN_ID),
1558 );
1559
1560 let schema_id = SchemaId::from(schema_spec);
1561 let database_id = match database_spec {
1562 ResolvedDatabaseSpecifier::Ambient => None,
1563 ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1564 };
1565
1566 CatalogState::add_to_audit_log(
1567 &state.system_configuration,
1568 oracle_write_ts,
1569 session,
1570 tx,
1571 audit_events,
1572 EventType::Drop,
1573 ObjectType::Schema,
1574 EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1575 id: schema_id.to_string(),
1576 name: schema.name.schema.to_string(),
1577 database_name: database_id
1578 .map(|database_id| state.database_by_id[&database_id].name.clone()),
1579 }),
1580 )?;
1581 }
1582
1583 tx.remove_databases(&delta.databases)?;
1585
1586 for database_id in delta.databases {
1587 let database = state.get_database(&database_id).clone();
1588
1589 CatalogState::add_to_audit_log(
1590 &state.system_configuration,
1591 oracle_write_ts,
1592 session,
1593 tx,
1594 audit_events,
1595 EventType::Drop,
1596 ObjectType::Database,
1597 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1598 id: database_id.to_string(),
1599 name: database.name.clone(),
1600 }),
1601 )?;
1602 }
1603
1604 tx.remove_user_roles(&delta.roles)?;
1606
1607 for role_id in delta.roles {
1608 let role = state
1609 .roles_by_id
1610 .get(&role_id)
1611 .expect("catalog out of sync");
1612
1613 CatalogState::add_to_audit_log(
1614 &state.system_configuration,
1615 oracle_write_ts,
1616 session,
1617 tx,
1618 audit_events,
1619 EventType::Drop,
1620 ObjectType::Role,
1621 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1622 id: role.id.to_string(),
1623 name: role.name.clone(),
1624 }),
1625 )?;
1626 info!("drop role {}", role.name());
1627 }
1628
1629 tx.remove_network_policies(&delta.network_policies)?;
1631
1632 for network_policy_id in delta.network_policies {
1633 let policy = state
1634 .network_policies_by_id
1635 .get(&network_policy_id)
1636 .expect("catalog out of sync");
1637
1638 CatalogState::add_to_audit_log(
1639 &state.system_configuration,
1640 oracle_write_ts,
1641 session,
1642 tx,
1643 audit_events,
1644 EventType::Drop,
1645 ObjectType::NetworkPolicy,
1646 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1647 id: policy.id.to_string(),
1648 name: policy.name.clone(),
1649 }),
1650 )?;
1651 info!("drop network policy {}", policy.name.clone());
1652 }
1653
1654 let replicas = delta.replicas.keys().copied().collect();
1656 tx.remove_cluster_replicas(&replicas)?;
1657
1658 for (replica_id, (cluster_id, reason)) in delta.replicas {
1659 let cluster = state.get_cluster(cluster_id);
1660 let replica = cluster.replica(replica_id).expect("Must exist");
1661
1662 let (reason, scheduling_policies) = reason.into_audit_log();
1663 let details =
1664 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1665 cluster_id: cluster_id.to_string(),
1666 cluster_name: cluster.name.clone(),
1667 replica_id: Some(replica_id.to_string()),
1668 replica_name: replica.name.clone(),
1669 reason,
1670 scheduling_policies,
1671 });
1672 CatalogState::add_to_audit_log(
1673 &state.system_configuration,
1674 oracle_write_ts,
1675 session,
1676 tx,
1677 audit_events,
1678 EventType::Drop,
1679 ObjectType::ClusterReplica,
1680 details,
1681 )?;
1682 }
1683
1684 tx.remove_clusters(&delta.clusters)?;
1686
1687 for cluster_id in delta.clusters {
1688 let cluster = state.get_cluster(cluster_id);
1689
1690 CatalogState::add_to_audit_log(
1691 &state.system_configuration,
1692 oracle_write_ts,
1693 session,
1694 tx,
1695 audit_events,
1696 EventType::Drop,
1697 ObjectType::Cluster,
1698 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1699 id: cluster.id.to_string(),
1700 name: cluster.name.clone(),
1701 }),
1702 )?;
1703 }
1704 }
1705 Op::GrantRole {
1706 role_id,
1707 member_id,
1708 grantor_id,
1709 } => {
1710 state.ensure_not_reserved_role(&member_id)?;
1711 state.ensure_grantable_role(&role_id)?;
1712 if state.collect_role_membership(&role_id).contains(&member_id) {
1713 let group_role = state.get_role(&role_id);
1714 let member_role = state.get_role(&member_id);
1715 return Err(AdapterError::Catalog(Error::new(
1716 ErrorKind::CircularRoleMembership {
1717 role_name: group_role.name().to_string(),
1718 member_name: member_role.name().to_string(),
1719 },
1720 )));
1721 }
1722 let mut member_role = state.get_role(&member_id).clone();
1723 member_role.membership.map.insert(role_id, grantor_id);
1724 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1725
1726 CatalogState::add_to_audit_log(
1727 &state.system_configuration,
1728 oracle_write_ts,
1729 session,
1730 tx,
1731 audit_events,
1732 EventType::Grant,
1733 ObjectType::Role,
1734 EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1735 role_id: role_id.to_string(),
1736 member_id: member_id.to_string(),
1737 grantor_id: grantor_id.to_string(),
1738 executed_by: session
1739 .map(|session| session.authenticated_role_id())
1740 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1741 .to_string(),
1742 }),
1743 )?;
1744 }
1745 Op::RevokeRole {
1746 role_id,
1747 member_id,
1748 grantor_id,
1749 } => {
1750 state.ensure_not_reserved_role(&member_id)?;
1751 state.ensure_grantable_role(&role_id)?;
1752 let mut member_role = state.get_role(&member_id).clone();
1753 member_role.membership.map.remove(&role_id);
1754 tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1755
1756 CatalogState::add_to_audit_log(
1757 &state.system_configuration,
1758 oracle_write_ts,
1759 session,
1760 tx,
1761 audit_events,
1762 EventType::Revoke,
1763 ObjectType::Role,
1764 EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1765 role_id: role_id.to_string(),
1766 member_id: member_id.to_string(),
1767 grantor_id: grantor_id.to_string(),
1768 executed_by: session
1769 .map(|session| session.authenticated_role_id())
1770 .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1771 .to_string(),
1772 }),
1773 )?;
1774 }
1775 Op::UpdatePrivilege {
1776 target_id,
1777 privilege,
1778 variant,
1779 } => {
1780 let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1781 UpdatePrivilegeVariant::Grant => {
1782 privileges.grant(privilege);
1783 }
1784 UpdatePrivilegeVariant::Revoke => {
1785 privileges.revoke(&privilege);
1786 }
1787 };
1788 match &target_id {
1789 SystemObjectId::Object(object_id) => match object_id {
1790 ObjectId::Cluster(id) => {
1791 let mut cluster = state.get_cluster(*id).clone();
1792 update_privilege_fn(&mut cluster.privileges);
1793 tx.update_cluster(*id, cluster.into())?;
1794 }
1795 ObjectId::Database(id) => {
1796 let mut database = state.get_database(id).clone();
1797 update_privilege_fn(&mut database.privileges);
1798 tx.update_database(*id, database.into())?;
1799 }
1800 ObjectId::NetworkPolicy(id) => {
1801 let mut policy = state.get_network_policy(id).clone();
1802 update_privilege_fn(&mut policy.privileges);
1803 tx.update_network_policy(*id, policy.into())?;
1804 }
1805 ObjectId::Schema((database_spec, schema_spec)) => {
1806 let schema_id = schema_spec.clone().into();
1807 let mut schema = state
1808 .get_schema(
1809 database_spec,
1810 schema_spec,
1811 session
1812 .map(|session| session.conn_id())
1813 .unwrap_or(&SYSTEM_CONN_ID),
1814 )
1815 .clone();
1816 update_privilege_fn(&mut schema.privileges);
1817 tx.update_schema(schema_id, schema.into())?;
1818 }
1819 ObjectId::Item(id) => {
1820 let entry = state.get_entry(id);
1821 let mut new_entry = entry.clone();
1822 update_privilege_fn(&mut new_entry.privileges);
1823 if !new_entry.item().is_temporary() {
1824 tx.update_item(*id, new_entry.into())?;
1825 } else {
1826 temporary_item_updates
1827 .push((entry.clone().into(), StateDiff::Retraction));
1828 temporary_item_updates
1829 .push((new_entry.into(), StateDiff::Addition));
1830 }
1831 }
1832 ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1833 },
1834 SystemObjectId::System => {
1835 let mut system_privileges = state.system_privileges.clone();
1836 update_privilege_fn(&mut system_privileges);
1837 let new_privilege =
1838 system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1839 tx.set_system_privilege(
1840 privilege.grantee,
1841 privilege.grantor,
1842 new_privilege.map(|new_privilege| new_privilege.acl_mode),
1843 )?;
1844 }
1845 }
1846 let object_type = state.get_system_object_type(&target_id);
1847 let object_id_str = match &target_id {
1848 SystemObjectId::System => "SYSTEM".to_string(),
1849 SystemObjectId::Object(id) => id.to_string(),
1850 };
1851 CatalogState::add_to_audit_log(
1852 &state.system_configuration,
1853 oracle_write_ts,
1854 session,
1855 tx,
1856 audit_events,
1857 variant.into(),
1858 system_object_type_to_audit_object_type(&object_type),
1859 EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1860 object_id: object_id_str,
1861 grantee_id: privilege.grantee.to_string(),
1862 grantor_id: privilege.grantor.to_string(),
1863 privileges: privilege.acl_mode.to_string(),
1864 }),
1865 )?;
1866 }
1867 Op::UpdateDefaultPrivilege {
1868 privilege_object,
1869 privilege_acl_item,
1870 variant,
1871 } => {
1872 let mut default_privileges = state.default_privileges.clone();
1873 match variant {
1874 UpdatePrivilegeVariant::Grant => default_privileges
1875 .grant(privilege_object.clone(), privilege_acl_item.clone()),
1876 UpdatePrivilegeVariant::Revoke => {
1877 default_privileges.revoke(&privilege_object, &privilege_acl_item)
1878 }
1879 }
1880 let new_acl_mode = default_privileges
1881 .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1882 tx.set_default_privilege(
1883 privilege_object.role_id,
1884 privilege_object.database_id,
1885 privilege_object.schema_id,
1886 privilege_object.object_type,
1887 privilege_acl_item.grantee,
1888 new_acl_mode.cloned(),
1889 )?;
1890 CatalogState::add_to_audit_log(
1891 &state.system_configuration,
1892 oracle_write_ts,
1893 session,
1894 tx,
1895 audit_events,
1896 variant.into(),
1897 object_type_to_audit_object_type(privilege_object.object_type),
1898 EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1899 role_id: privilege_object.role_id.to_string(),
1900 database_id: privilege_object.database_id.map(|id| id.to_string()),
1901 schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1902 grantee_id: privilege_acl_item.grantee.to_string(),
1903 privileges: privilege_acl_item.acl_mode.to_string(),
1904 }),
1905 )?;
1906 }
1907 Op::RenameCluster {
1908 id,
1909 name,
1910 to_name,
1911 check_reserved_names,
1912 } => {
1913 if id.is_system() {
1914 return Err(AdapterError::Catalog(Error::new(
1915 ErrorKind::ReadOnlyCluster(name.clone()),
1916 )));
1917 }
1918 if check_reserved_names && is_reserved_name(&to_name) {
1919 return Err(AdapterError::Catalog(Error::new(
1920 ErrorKind::ReservedClusterName(to_name),
1921 )));
1922 }
1923 tx.rename_cluster(id, &name, &to_name)?;
1924 CatalogState::add_to_audit_log(
1925 &state.system_configuration,
1926 oracle_write_ts,
1927 session,
1928 tx,
1929 audit_events,
1930 EventType::Alter,
1931 ObjectType::Cluster,
1932 EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1933 id: id.to_string(),
1934 old_name: name.clone(),
1935 new_name: to_name.clone(),
1936 }),
1937 )?;
1938 info!("rename cluster {name} to {to_name}");
1939 }
1940 Op::RenameClusterReplica {
1941 cluster_id,
1942 replica_id,
1943 name,
1944 to_name,
1945 } => {
1946 if is_reserved_name(&to_name) {
1947 return Err(AdapterError::Catalog(Error::new(
1948 ErrorKind::ReservedReplicaName(to_name),
1949 )));
1950 }
1951 tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1952 CatalogState::add_to_audit_log(
1953 &state.system_configuration,
1954 oracle_write_ts,
1955 session,
1956 tx,
1957 audit_events,
1958 EventType::Alter,
1959 ObjectType::ClusterReplica,
1960 EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1961 cluster_id: cluster_id.to_string(),
1962 replica_id: replica_id.to_string(),
1963 old_name: name.replica.as_str().to_string(),
1964 new_name: to_name.clone(),
1965 }),
1966 )?;
1967 info!("rename cluster replica {name} to {to_name}");
1968 }
1969 Op::RenameItem {
1970 id,
1971 to_name,
1972 current_full_name,
1973 } => {
1974 let mut updates = Vec::new();
1975
1976 let entry = state.get_entry(&id);
1977 if let CatalogItem::Type(_) = entry.item() {
1978 return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1979 current_full_name.to_string(),
1980 ))));
1981 }
1982
1983 if entry.id().is_system() {
1984 let name = state
1985 .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1986 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1987 name.to_string(),
1988 ))));
1989 }
1990
1991 let mut to_full_name = current_full_name.clone();
1992 to_full_name.item.clone_from(&to_name);
1993
1994 let mut to_qualified_name = entry.name().clone();
1995 to_qualified_name.item.clone_from(&to_name);
1996
1997 let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1998 id: id.to_string(),
1999 old_name: Self::full_name_detail(¤t_full_name),
2000 new_name: Self::full_name_detail(&to_full_name),
2001 });
2002 if Self::should_audit_log_item(entry.item()) {
2003 CatalogState::add_to_audit_log(
2004 &state.system_configuration,
2005 oracle_write_ts,
2006 session,
2007 tx,
2008 audit_events,
2009 EventType::Alter,
2010 catalog_type_to_audit_object_type(entry.item().typ()),
2011 details,
2012 )?;
2013 }
2014
2015 let mut new_entry = entry.clone();
2017 new_entry.name.item.clone_from(&to_name);
2018 new_entry.item = entry
2019 .item()
2020 .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2021 .map_err(|e| {
2022 Error::new(ErrorKind::from(AmbiguousRename {
2023 depender: state
2024 .resolve_full_name(entry.name(), entry.conn_id())
2025 .to_string(),
2026 dependee: state
2027 .resolve_full_name(entry.name(), entry.conn_id())
2028 .to_string(),
2029 message: e,
2030 }))
2031 })?;
2032
2033 for id in entry.referenced_by() {
2034 let dependent_item = state.get_entry(id);
2035 let mut to_entry = dependent_item.clone();
2036 to_entry.item = dependent_item
2037 .item()
2038 .rename_item_refs(
2039 current_full_name.clone(),
2040 to_full_name.item.clone(),
2041 false,
2042 )
2043 .map_err(|e| {
2044 Error::new(ErrorKind::from(AmbiguousRename {
2045 depender: state
2046 .resolve_full_name(
2047 dependent_item.name(),
2048 dependent_item.conn_id(),
2049 )
2050 .to_string(),
2051 dependee: state
2052 .resolve_full_name(entry.name(), entry.conn_id())
2053 .to_string(),
2054 message: e,
2055 }))
2056 })?;
2057
2058 if !to_entry.item().is_temporary() {
2059 tx.update_item(*id, to_entry.into())?;
2060 } else {
2061 temporary_item_updates
2062 .push((dependent_item.clone().into(), StateDiff::Retraction));
2063 temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2064 }
2065 updates.push(*id);
2066 }
2067 if !new_entry.item().is_temporary() {
2068 tx.update_item(id, new_entry.into())?;
2069 } else {
2070 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2071 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2072 }
2073
2074 updates.push(id);
2075 for id in updates {
2076 Self::log_update(state, &id);
2077 }
2078 }
2079 Op::RenameSchema {
2080 database_spec,
2081 schema_spec,
2082 new_name,
2083 check_reserved_names,
2084 } => {
2085 if check_reserved_names && is_reserved_name(&new_name) {
2086 return Err(AdapterError::Catalog(Error::new(
2087 ErrorKind::ReservedSchemaName(new_name),
2088 )));
2089 }
2090
2091 let conn_id = session
2092 .map(|session| session.conn_id())
2093 .unwrap_or(&SYSTEM_CONN_ID);
2094
2095 let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2096 let cur_name = schema.name().schema.clone();
2097
2098 let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2099 return Err(AdapterError::Catalog(Error::new(
2100 ErrorKind::AmbientSchemaRename(cur_name),
2101 )));
2102 };
2103 let database = state.get_database(&database_id);
2104 let database_name = &database.name;
2105
2106 let mut updates = Vec::new();
2107 let mut items_to_update = BTreeMap::new();
2108
2109 let mut update_item = |id| {
2110 if items_to_update.contains_key(id) {
2111 return Ok(());
2112 }
2113
2114 let entry = state.get_entry(id);
2115
2116 let mut new_entry = entry.clone();
2118 new_entry.item = entry
2119 .item
2120 .rename_schema_refs(database_name, &cur_name, &new_name)
2121 .map_err(|(s, _i)| {
2122 Error::new(ErrorKind::from(AmbiguousRename {
2123 depender: state
2124 .resolve_full_name(entry.name(), entry.conn_id())
2125 .to_string(),
2126 dependee: format!("{database_name}.{cur_name}"),
2127 message: format!("ambiguous reference to schema named {s}"),
2128 }))
2129 })?;
2130
2131 if !new_entry.item().is_temporary() {
2133 items_to_update.insert(*id, new_entry.into());
2134 } else {
2135 temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2136 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2137 }
2138 updates.push(id);
2139
2140 Ok::<_, AdapterError>(())
2141 };
2142
2143 for (_name, item_id) in &schema.items {
2145 update_item(item_id)?;
2147
2148 for id in state.get_entry(item_id).referenced_by() {
2150 update_item(id)?;
2151 }
2152 }
2153 tx.update_items(items_to_update)?;
2156
2157 let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2159 let schema_name = schema.name().schema.clone();
2160 return Err(AdapterError::Catalog(crate::catalog::Error::new(
2161 crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2162 )));
2163 };
2164
2165 let database_name = database_spec
2167 .id()
2168 .map(|id| state.get_database(&id).name.clone());
2169 let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2170 id: schema_id.to_string(),
2171 old_name: schema.name().schema.clone(),
2172 new_name: new_name.clone(),
2173 database_name,
2174 });
2175 CatalogState::add_to_audit_log(
2176 &state.system_configuration,
2177 oracle_write_ts,
2178 session,
2179 tx,
2180 audit_events,
2181 EventType::Alter,
2182 mz_audit_log::ObjectType::Schema,
2183 details,
2184 )?;
2185
2186 let mut new_schema = schema.clone();
2188 new_schema.name.schema.clone_from(&new_name);
2189 tx.update_schema(schema_id, new_schema.into())?;
2190
2191 for id in updates {
2192 Self::log_update(state, id);
2193 }
2194 }
2195 Op::UpdateOwner { id, new_owner } => {
2196 let conn_id = session
2197 .map(|session| session.conn_id())
2198 .unwrap_or(&SYSTEM_CONN_ID);
2199 let old_owner = state
2200 .get_owner_id(&id, conn_id)
2201 .expect("cannot update the owner of an object without an owner");
2202 match &id {
2203 ObjectId::Cluster(id) => {
2204 let mut cluster = state.get_cluster(*id).clone();
2205 if id.is_system() {
2206 return Err(AdapterError::Catalog(Error::new(
2207 ErrorKind::ReadOnlyCluster(cluster.name),
2208 )));
2209 }
2210 Self::update_privilege_owners(
2211 &mut cluster.privileges,
2212 cluster.owner_id,
2213 new_owner,
2214 );
2215 cluster.owner_id = new_owner;
2216 tx.update_cluster(*id, cluster.into())?;
2217 }
2218 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2219 let cluster = state.get_cluster(*cluster_id);
2220 let mut replica = cluster
2221 .replica(*replica_id)
2222 .expect("catalog out of sync")
2223 .clone();
2224 if replica_id.is_system() {
2225 return Err(AdapterError::Catalog(Error::new(
2226 ErrorKind::ReadOnlyClusterReplica(replica.name),
2227 )));
2228 }
2229 replica.owner_id = new_owner;
2230 tx.update_cluster_replica(*replica_id, replica.into())?;
2231 }
2232 ObjectId::Database(id) => {
2233 let mut database = state.get_database(id).clone();
2234 if id.is_system() {
2235 return Err(AdapterError::Catalog(Error::new(
2236 ErrorKind::ReadOnlyDatabase(database.name),
2237 )));
2238 }
2239 Self::update_privilege_owners(
2240 &mut database.privileges,
2241 database.owner_id,
2242 new_owner,
2243 );
2244 database.owner_id = new_owner;
2245 tx.update_database(*id, database.clone().into())?;
2246 }
2247 ObjectId::Schema((database_spec, schema_spec)) => {
2248 let schema_id: SchemaId = schema_spec.clone().into();
2249 let mut schema = state
2250 .get_schema(database_spec, schema_spec, conn_id)
2251 .clone();
2252 if schema_id.is_system() {
2253 let name = schema.name();
2254 let full_name = state.resolve_full_schema_name(name);
2255 return Err(AdapterError::Catalog(Error::new(
2256 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2257 )));
2258 }
2259 Self::update_privilege_owners(
2260 &mut schema.privileges,
2261 schema.owner_id,
2262 new_owner,
2263 );
2264 schema.owner_id = new_owner;
2265 tx.update_schema(schema_id, schema.into())?;
2266 }
2267 ObjectId::Item(id) => {
2268 let entry = state.get_entry(id);
2269 let mut new_entry = entry.clone();
2270 if id.is_system() {
2271 let full_name = state.resolve_full_name(
2272 new_entry.name(),
2273 session.map(|session| session.conn_id()),
2274 );
2275 return Err(AdapterError::Catalog(Error::new(
2276 ErrorKind::ReadOnlyItem(full_name.to_string()),
2277 )));
2278 }
2279 Self::update_privilege_owners(
2280 &mut new_entry.privileges,
2281 new_entry.owner_id,
2282 new_owner,
2283 );
2284 new_entry.owner_id = new_owner;
2285 if !new_entry.item().is_temporary() {
2286 tx.update_item(*id, new_entry.into())?;
2287 } else {
2288 temporary_item_updates
2289 .push((entry.clone().into(), StateDiff::Retraction));
2290 temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2291 }
2292 }
2293 ObjectId::NetworkPolicy(id) => {
2294 let mut policy = state.get_network_policy(id).clone();
2295 if id.is_system() {
2296 return Err(AdapterError::Catalog(Error::new(
2297 ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2298 )));
2299 }
2300 Self::update_privilege_owners(
2301 &mut policy.privileges,
2302 policy.owner_id,
2303 new_owner,
2304 );
2305 policy.owner_id = new_owner;
2306 tx.update_network_policy(*id, policy.into())?;
2307 }
2308 ObjectId::Role(_) => unreachable!("roles have no owner"),
2309 }
2310 let object_type = state.get_object_type(&id);
2311 CatalogState::add_to_audit_log(
2312 &state.system_configuration,
2313 oracle_write_ts,
2314 session,
2315 tx,
2316 audit_events,
2317 EventType::Alter,
2318 object_type_to_audit_object_type(object_type),
2319 EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2320 object_id: id.to_string(),
2321 old_owner_id: old_owner.to_string(),
2322 new_owner_id: new_owner.to_string(),
2323 }),
2324 )?;
2325 }
2326 Op::UpdateClusterConfig { id, name, config } => {
2327 let mut cluster = state.get_cluster(id).clone();
2328 cluster.config = config;
2329 tx.update_cluster(id, cluster.into())?;
2330 info!("update cluster {}", name);
2331
2332 CatalogState::add_to_audit_log(
2333 &state.system_configuration,
2334 oracle_write_ts,
2335 session,
2336 tx,
2337 audit_events,
2338 EventType::Alter,
2339 ObjectType::Cluster,
2340 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2341 id: id.to_string(),
2342 name,
2343 }),
2344 )?;
2345 }
2346 Op::UpdateClusterReplicaConfig {
2347 replica_id,
2348 cluster_id,
2349 config,
2350 } => {
2351 let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2352 info!("update replica {}", replica.name);
2353 tx.update_cluster_replica(
2354 replica_id,
2355 mz_catalog::durable::ClusterReplica {
2356 cluster_id,
2357 replica_id,
2358 name: replica.name.clone(),
2359 config: config.clone().into(),
2360 owner_id: replica.owner_id,
2361 },
2362 )?;
2363 }
2364 Op::UpdateItem { id, name, to_item } => {
2365 let mut entry = state.get_entry(&id).clone();
2366 entry.name = name.clone();
2367 entry.item = to_item.clone();
2368 tx.update_item(id, entry.into())?;
2369
2370 if Self::should_audit_log_item(&to_item) {
2371 let mut full_name = Self::full_name_detail(
2372 &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2373 );
2374 full_name.item = name.item;
2375
2376 CatalogState::add_to_audit_log(
2377 &state.system_configuration,
2378 oracle_write_ts,
2379 session,
2380 tx,
2381 audit_events,
2382 EventType::Alter,
2383 catalog_type_to_audit_object_type(to_item.typ()),
2384 EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2385 id: id.to_string(),
2386 name: full_name,
2387 }),
2388 )?;
2389 }
2390
2391 Self::log_update(state, &id);
2392 }
2393 Op::UpdateSystemConfiguration { name, value } => {
2394 let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2395 tx.upsert_system_config(&name, parsed_value.clone())?;
2396 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2401 let with_0dt_deployment_max_wait =
2402 Duration::parse(VarInput::Flat(&parsed_value))
2403 .expect("parsing succeeded above");
2404 tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2405 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2406 let with_0dt_deployment_ddl_check_interval =
2407 Duration::parse(VarInput::Flat(&parsed_value))
2408 .expect("parsing succeeded above");
2409 tx.set_0dt_deployment_ddl_check_interval(
2410 with_0dt_deployment_ddl_check_interval,
2411 )?;
2412 } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2413 let panic_after_timeout =
2414 strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2415 tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2416 }
2417
2418 CatalogState::add_to_audit_log(
2419 &state.system_configuration,
2420 oracle_write_ts,
2421 session,
2422 tx,
2423 audit_events,
2424 EventType::Alter,
2425 ObjectType::System,
2426 EventDetails::SetV1(mz_audit_log::SetV1 {
2427 name,
2428 value: Some(value.borrow().to_vec().join(", ")),
2429 }),
2430 )?;
2431 }
2432 Op::ResetSystemConfiguration { name } => {
2433 tx.remove_system_config(&name);
2434 if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2439 tx.reset_0dt_deployment_max_wait()?;
2440 } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2441 tx.reset_0dt_deployment_ddl_check_interval()?;
2442 }
2443
2444 CatalogState::add_to_audit_log(
2445 &state.system_configuration,
2446 oracle_write_ts,
2447 session,
2448 tx,
2449 audit_events,
2450 EventType::Alter,
2451 ObjectType::System,
2452 EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2453 )?;
2454 }
2455 Op::ResetAllSystemConfiguration => {
2456 tx.clear_system_configs();
2457 tx.reset_0dt_deployment_max_wait()?;
2458 tx.reset_0dt_deployment_ddl_check_interval()?;
2459
2460 CatalogState::add_to_audit_log(
2461 &state.system_configuration,
2462 oracle_write_ts,
2463 session,
2464 tx,
2465 audit_events,
2466 EventType::Alter,
2467 ObjectType::System,
2468 EventDetails::ResetAllV1,
2469 )?;
2470 }
2471 Op::WeirdStorageUsageUpdates {
2472 object_id,
2473 size_bytes,
2474 collection_timestamp,
2475 } => {
2476 let id = tx.allocate_storage_usage_ids()?;
2477 let metric =
2478 VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2479 let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2480 let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2481 weird_builtin_table_update = Some(builtin_table_update);
2482 }
2483 };
2484 Ok((weird_builtin_table_update, temporary_item_updates))
2485 }
2486
2487 fn log_update(state: &CatalogState, id: &CatalogItemId) {
2488 let entry = state.get_entry(id);
2489 info!(
2490 "update {} {} ({})",
2491 entry.item_type(),
2492 state.resolve_full_name(entry.name(), entry.conn_id()),
2493 id
2494 );
2495 }
2496
2497 fn update_privilege_owners(
2501 privileges: &mut PrivilegeMap,
2502 old_owner: RoleId,
2503 new_owner: RoleId,
2504 ) {
2505 let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2507
2508 let mut new_present = false;
2509 for privilege in flat_privileges.iter_mut() {
2510 if privilege.grantor == old_owner {
2513 privilege.grantor = new_owner;
2514 } else if privilege.grantor == new_owner {
2515 new_present = true;
2516 }
2517 if privilege.grantee == old_owner {
2519 privilege.grantee = new_owner;
2520 } else if privilege.grantee == new_owner {
2521 new_present = true;
2522 }
2523 }
2524
2525 if new_present {
2529 let privilege_map: BTreeMap<_, Vec<_>> =
2531 flat_privileges
2532 .into_iter()
2533 .fold(BTreeMap::new(), |mut accum, privilege| {
2534 accum
2535 .entry((privilege.grantee, privilege.grantor))
2536 .or_default()
2537 .push(privilege);
2538 accum
2539 });
2540
2541 flat_privileges = privilege_map
2543 .into_iter()
2544 .map(|((grantee, grantor), values)|
2545 values.into_iter().fold(
2547 MzAclItem::empty(grantee, grantor),
2548 |mut accum, mz_aclitem| {
2549 accum.acl_mode =
2550 accum.acl_mode.union(mz_aclitem.acl_mode);
2551 accum
2552 },
2553 ))
2554 .collect();
2555 }
2556
2557 *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2558 }
2559}
2560
2561#[derive(Debug, Default)]
2569pub(crate) struct ObjectsToDrop {
2570 pub comments: BTreeSet<CommentObjectId>,
2571 pub databases: BTreeSet<DatabaseId>,
2572 pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2573 pub clusters: BTreeSet<ClusterId>,
2574 pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2575 pub roles: BTreeSet<RoleId>,
2576 pub items: Vec<CatalogItemId>,
2577 pub network_policies: BTreeSet<NetworkPolicyId>,
2578}
2579
2580impl ObjectsToDrop {
2581 pub fn generate(
2582 drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2583 state: &CatalogState,
2584 session: Option<&ConnMeta>,
2585 ) -> Result<Self, AdapterError> {
2586 let mut delta = ObjectsToDrop::default();
2587
2588 for drop_object_info in drop_object_infos {
2589 delta.add_item(drop_object_info, state, session)?;
2590 }
2591
2592 Ok(delta)
2593 }
2594
2595 fn add_item(
2596 &mut self,
2597 drop_object_info: DropObjectInfo,
2598 state: &CatalogState,
2599 session: Option<&ConnMeta>,
2600 ) -> Result<(), AdapterError> {
2601 self.comments
2602 .insert(state.get_comment_id(drop_object_info.to_object_id()));
2603
2604 match drop_object_info {
2605 DropObjectInfo::Database(database_id) => {
2606 let database = &state.database_by_id[&database_id];
2607 if database_id.is_system() {
2608 return Err(AdapterError::Catalog(Error::new(
2609 ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2610 )));
2611 }
2612
2613 self.databases.insert(database_id);
2614 }
2615 DropObjectInfo::Schema((database_spec, schema_spec)) => {
2616 let schema = state.get_schema(
2617 &database_spec,
2618 &schema_spec,
2619 session
2620 .map(|session| session.conn_id())
2621 .unwrap_or(&SYSTEM_CONN_ID),
2622 );
2623 let schema_id: SchemaId = schema_spec.into();
2624 if schema_id.is_system() {
2625 let name = schema.name();
2626 let full_name = state.resolve_full_schema_name(name);
2627 return Err(AdapterError::Catalog(Error::new(
2628 ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2629 )));
2630 }
2631
2632 self.schemas.insert(schema_spec, database_spec);
2633 }
2634 DropObjectInfo::Role(role_id) => {
2635 let name = state.get_role(&role_id).name().to_string();
2636 if role_id.is_system() || role_id.is_predefined() {
2637 return Err(AdapterError::Catalog(Error::new(
2638 ErrorKind::ReservedRoleName(name.clone()),
2639 )));
2640 }
2641 state.ensure_not_reserved_role(&role_id)?;
2642
2643 self.roles.insert(role_id);
2644 }
2645 DropObjectInfo::Cluster(cluster_id) => {
2646 let cluster = state.get_cluster(cluster_id);
2647 let name = &cluster.name;
2648 if cluster_id.is_system() {
2649 return Err(AdapterError::Catalog(Error::new(
2650 ErrorKind::ReadOnlyCluster(name.clone()),
2651 )));
2652 }
2653
2654 self.clusters.insert(cluster_id);
2655 }
2656 DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2657 let cluster = state.get_cluster(cluster_id);
2658 let replica = cluster.replica(replica_id).expect("Must exist");
2659
2660 self.replicas
2661 .insert(replica.replica_id, (cluster.id, reason));
2662 }
2663 DropObjectInfo::Item(item_id) => {
2664 let entry = state.get_entry(&item_id);
2665 if item_id.is_system() {
2666 let name = entry.name();
2667 let full_name =
2668 state.resolve_full_name(name, session.map(|session| session.conn_id()));
2669 return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2670 full_name.to_string(),
2671 ))));
2672 }
2673
2674 self.items.push(item_id);
2675 }
2676 DropObjectInfo::NetworkPolicy(network_policy_id) => {
2677 let policy = state.get_network_policy(&network_policy_id);
2678 let name = &policy.name;
2679 if network_policy_id.is_system() {
2680 return Err(AdapterError::Catalog(Error::new(
2681 ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2682 )));
2683 }
2684
2685 self.network_policies.insert(network_policy_id);
2686 }
2687 }
2688
2689 Ok(())
2690 }
2691}
2692
2693#[cfg(test)]
2694mod tests {
2695 use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2696 use mz_repr::role_id::RoleId;
2697
2698 use crate::catalog::Catalog;
2699
2700 #[mz_ore::test]
2701 fn test_update_privilege_owners() {
2702 let old_owner = RoleId::User(1);
2703 let new_owner = RoleId::User(2);
2704 let other_role = RoleId::User(3);
2705
2706 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2708 MzAclItem {
2709 grantee: other_role,
2710 grantor: old_owner,
2711 acl_mode: AclMode::UPDATE,
2712 },
2713 MzAclItem {
2714 grantee: other_role,
2715 grantor: new_owner,
2716 acl_mode: AclMode::SELECT,
2717 },
2718 ]);
2719 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2720 assert_eq!(1, privileges.all_values().count());
2721 assert_eq!(
2722 vec![MzAclItem {
2723 grantee: other_role,
2724 grantor: new_owner,
2725 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2726 }],
2727 privileges.all_values_owned().collect::<Vec<_>>()
2728 );
2729
2730 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2732 MzAclItem {
2733 grantee: old_owner,
2734 grantor: other_role,
2735 acl_mode: AclMode::UPDATE,
2736 },
2737 MzAclItem {
2738 grantee: new_owner,
2739 grantor: other_role,
2740 acl_mode: AclMode::SELECT,
2741 },
2742 ]);
2743 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2744 assert_eq!(1, privileges.all_values().count());
2745 assert_eq!(
2746 vec![MzAclItem {
2747 grantee: new_owner,
2748 grantor: other_role,
2749 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2750 }],
2751 privileges.all_values_owned().collect::<Vec<_>>()
2752 );
2753
2754 let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2756 MzAclItem {
2757 grantee: old_owner,
2758 grantor: old_owner,
2759 acl_mode: AclMode::UPDATE,
2760 },
2761 MzAclItem {
2762 grantee: new_owner,
2763 grantor: new_owner,
2764 acl_mode: AclMode::SELECT,
2765 },
2766 ]);
2767 Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2768 assert_eq!(1, privileges.all_values().count());
2769 assert_eq!(
2770 vec![MzAclItem {
2771 grantee: new_owner,
2772 grantor: new_owner,
2773 acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2774 }],
2775 privileges.all_values_owned().collect::<Vec<_>>()
2776 );
2777 }
2778}