mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
22    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34    StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, RoleAttributes, RoleMembership,
50    RoleVars,
51};
52use mz_sql::names::{
53    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69    is_reserved_role_name, object_type_to_audit_object_type,
70    system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::cluster_scheduling::SchedulingDecision;
74use crate::util::ResultExt;
75
76#[derive(Debug, Clone)]
77pub enum Op {
78    AlterRetainHistory {
79        id: CatalogItemId,
80        value: Option<Value>,
81        window: CompactionWindow,
82    },
83    AlterRole {
84        id: RoleId,
85        name: String,
86        attributes: RoleAttributes,
87        vars: RoleVars,
88    },
89    AlterNetworkPolicy {
90        id: NetworkPolicyId,
91        rules: Vec<NetworkPolicyRule>,
92        name: String,
93        owner_id: RoleId,
94    },
95    AlterAddColumn {
96        id: CatalogItemId,
97        new_global_id: GlobalId,
98        name: ColumnName,
99        typ: SqlColumnType,
100        sql: RawDataType,
101    },
102    CreateDatabase {
103        name: String,
104        owner_id: RoleId,
105    },
106    CreateSchema {
107        database_id: ResolvedDatabaseSpecifier,
108        schema_name: String,
109        owner_id: RoleId,
110    },
111    CreateRole {
112        name: String,
113        attributes: RoleAttributes,
114    },
115    CreateCluster {
116        id: ClusterId,
117        name: String,
118        introspection_sources: Vec<&'static BuiltinLog>,
119        owner_id: RoleId,
120        config: ClusterConfig,
121    },
122    CreateClusterReplica {
123        cluster_id: ClusterId,
124        name: String,
125        config: ReplicaConfig,
126        owner_id: RoleId,
127        reason: ReplicaCreateDropReason,
128    },
129    CreateItem {
130        id: CatalogItemId,
131        name: QualifiedItemName,
132        item: CatalogItem,
133        owner_id: RoleId,
134    },
135    CreateNetworkPolicy {
136        rules: Vec<NetworkPolicyRule>,
137        name: String,
138        owner_id: RoleId,
139    },
140    Comment {
141        object_id: CommentObjectId,
142        sub_component: Option<usize>,
143        comment: Option<String>,
144    },
145    DropObjects(Vec<DropObjectInfo>),
146    GrantRole {
147        role_id: RoleId,
148        member_id: RoleId,
149        grantor_id: RoleId,
150    },
151    RenameCluster {
152        id: ClusterId,
153        name: String,
154        to_name: String,
155        check_reserved_names: bool,
156    },
157    RenameClusterReplica {
158        cluster_id: ClusterId,
159        replica_id: ReplicaId,
160        name: QualifiedReplica,
161        to_name: String,
162    },
163    RenameItem {
164        id: CatalogItemId,
165        current_full_name: FullItemName,
166        to_name: String,
167    },
168    RenameSchema {
169        database_spec: ResolvedDatabaseSpecifier,
170        schema_spec: SchemaSpecifier,
171        new_name: String,
172        check_reserved_names: bool,
173    },
174    UpdateOwner {
175        id: ObjectId,
176        new_owner: RoleId,
177    },
178    UpdatePrivilege {
179        target_id: SystemObjectId,
180        privilege: MzAclItem,
181        variant: UpdatePrivilegeVariant,
182    },
183    UpdateDefaultPrivilege {
184        privilege_object: DefaultPrivilegeObject,
185        privilege_acl_item: DefaultPrivilegeAclItem,
186        variant: UpdatePrivilegeVariant,
187    },
188    RevokeRole {
189        role_id: RoleId,
190        member_id: RoleId,
191        grantor_id: RoleId,
192    },
193    UpdateClusterConfig {
194        id: ClusterId,
195        name: String,
196        config: ClusterConfig,
197    },
198    UpdateClusterReplicaConfig {
199        cluster_id: ClusterId,
200        replica_id: ReplicaId,
201        config: ReplicaConfig,
202    },
203    UpdateItem {
204        id: CatalogItemId,
205        name: QualifiedItemName,
206        to_item: CatalogItem,
207    },
208    UpdateSourceReferences {
209        source_id: CatalogItemId,
210        references: SourceReferences,
211    },
212    UpdateSystemConfiguration {
213        name: String,
214        value: OwnedVarInput,
215    },
216    ResetSystemConfiguration {
217        name: String,
218    },
219    ResetAllSystemConfiguration,
220    /// Performs updates to the storage usage table, which probably should be a builtin source.
221    ///
222    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
223    /// might not work. If a process crashes after collecting storage usage events
224    /// but before updating the builtin table, then another listening catalog
225    /// will never know to update the builtin table.
226    WeirdStorageUsageUpdates {
227        object_id: Option<String>,
228        size_bytes: u64,
229        collection_timestamp: EpochMillis,
230    },
231    /// Performs a dry run of the commit, but errors with
232    /// [`AdapterError::TransactionDryRun`].
233    ///
234    /// When using this value, it should be included only as the last element of
235    /// the transaction and should not be the only value in the transaction.
236    TransactionDryRun,
237}
238
239/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
240/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
241/// the `Op::DropObjects`.
242#[derive(Debug, Clone)]
243pub enum DropObjectInfo {
244    Cluster(ClusterId),
245    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
246    Database(DatabaseId),
247    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
248    Role(RoleId),
249    Item(CatalogItemId),
250    NetworkPolicy(NetworkPolicyId),
251}
252
253impl DropObjectInfo {
254    /// Creates a `DropObjectInfo` from an `ObjectId`.
255    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
256    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
257        match id {
258            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
259            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
260                cluster_id,
261                replica_id,
262                ReplicaCreateDropReason::Manual,
263            )),
264            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
265            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
266            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
267            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
268            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
269        }
270    }
271
272    /// Creates an `ObjectId` from a `DropObjectInfo`.
273    /// Loses the `ReplicaCreateDropReason` if there is one!
274    fn to_object_id(&self) -> ObjectId {
275        match &self {
276            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
277            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
278                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
279            }
280            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
281            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
282            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
283            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
284            DropObjectInfo::NetworkPolicy(network_policy_id) => {
285                ObjectId::NetworkPolicy(network_policy_id.clone())
286            }
287        }
288    }
289}
290
291/// The reason for creating or dropping a replica.
292#[derive(Debug, Clone)]
293pub enum ReplicaCreateDropReason {
294    /// The user initiated the replica create or drop, e.g., by
295    /// - creating/dropping a cluster,
296    /// - ALTERing various options on a managed cluster,
297    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
298    Manual,
299    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
300    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
301    ClusterScheduling(Vec<SchedulingDecision>),
302}
303
304impl ReplicaCreateDropReason {
305    pub fn into_audit_log(
306        self,
307    ) -> (
308        CreateOrDropClusterReplicaReasonV1,
309        Option<SchedulingDecisionsWithReasonsV2>,
310    ) {
311        let (reason, scheduling_policies) = match self {
312            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
313            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
314                CreateOrDropClusterReplicaReasonV1::Schedule,
315                Some(scheduling_decisions),
316            ),
317        };
318        (
319            reason,
320            scheduling_policies
321                .as_ref()
322                .map(SchedulingDecision::reasons_to_audit_log_reasons),
323        )
324    }
325}
326
327pub struct TransactionResult {
328    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
329    pub audit_events: Vec<VersionedEvent>,
330}
331
332impl Catalog {
333    fn should_audit_log_item(item: &CatalogItem) -> bool {
334        !item.is_temporary()
335    }
336
337    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
338    /// within a connection id.
339    fn temporary_ids(
340        &self,
341        ops: &[Op],
342        temporary_drops: BTreeSet<(&ConnectionId, String)>,
343    ) -> Result<BTreeSet<CatalogItemId>, Error> {
344        let mut creating = BTreeSet::new();
345        let mut temporary_ids = BTreeSet::new();
346        for op in ops.iter() {
347            if let Op::CreateItem {
348                id,
349                name,
350                item,
351                owner_id: _,
352            } = op
353            {
354                if let Some(conn_id) = item.conn_id() {
355                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
356                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
357                        || creating.contains(&(conn_id, &name.item))
358                    {
359                        return Err(
360                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
361                        );
362                    } else {
363                        creating.insert((conn_id, &name.item));
364                        temporary_ids.insert(id.clone());
365                    }
366                }
367            }
368        }
369        Ok(temporary_ids)
370    }
371
372    #[instrument(name = "catalog::transact")]
373    pub async fn transact(
374        &mut self,
375        // n.b. this is an option to prevent us from needing to build out a
376        // dummy impl of `StorageController` for tests.
377        storage_collections: Option<
378            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
379        >,
380        oracle_write_ts: mz_repr::Timestamp,
381        session: Option<&ConnMeta>,
382        ops: Vec<Op>,
383    ) -> Result<TransactionResult, AdapterError> {
384        trace!("transact: {:?}", ops);
385        fail::fail_point!("catalog_transact", |arg| {
386            Err(AdapterError::Unstructured(anyhow::anyhow!(
387                "failpoint: {arg:?}"
388            )))
389        });
390
391        let drop_ids: BTreeSet<CatalogItemId> = ops
392            .iter()
393            .filter_map(|op| match op {
394                Op::DropObjects(drop_object_infos) => {
395                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
396                    let item_ids = ids.filter_map(|id| match id {
397                        ObjectId::Item(id) => Some(id),
398                        _ => None,
399                    });
400                    Some(item_ids)
401                }
402                _ => None,
403            })
404            .flatten()
405            .collect();
406        let temporary_drops = drop_ids
407            .iter()
408            .filter_map(|id| {
409                let entry = self.get_entry(id);
410                match entry.item().conn_id() {
411                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
412                    None => None,
413                }
414            })
415            .collect();
416        let dropped_global_ids = drop_ids
417            .iter()
418            .flat_map(|item_id| self.get_global_ids(item_id))
419            .collect();
420
421        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
422        let mut builtin_table_updates = vec![];
423        let mut audit_events = vec![];
424        let mut storage = self.storage().await;
425        let mut tx = storage
426            .transaction()
427            .await
428            .unwrap_or_terminate("starting catalog transaction");
429
430        let new_state = Self::transact_inner(
431            storage_collections,
432            oracle_write_ts,
433            session,
434            ops,
435            temporary_ids,
436            &mut builtin_table_updates,
437            &mut audit_events,
438            &mut tx,
439            &self.state,
440        )
441        .await?;
442
443        // The user closure was successful, apply the updates. Terminate the
444        // process if this fails, because we have to restart envd due to
445        // indeterminate catalog state, which we only reconcile during catalog
446        // init.
447        tx.commit(oracle_write_ts)
448            .await
449            .unwrap_or_terminate("catalog storage transaction commit must succeed");
450
451        // Dropping here keeps the mutable borrow on self, preventing us accidentally
452        // mutating anything until after f is executed.
453        drop(storage);
454        if let Some(new_state) = new_state {
455            self.transient_revision += 1;
456            self.state = new_state;
457        }
458
459        // Drop in-memory planning metadata.
460        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
461        if self.state.system_config().enable_mz_notices() {
462            // Generate retractions for the Builtin tables.
463            self.state().pack_optimizer_notices(
464                &mut builtin_table_updates,
465                dropped_notices.iter(),
466                Diff::MINUS_ONE,
467            );
468        }
469
470        Ok(TransactionResult {
471            builtin_table_updates,
472            audit_events,
473        })
474    }
475
476    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
477    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
478    ///
479    /// # Panics
480    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
481    ///   final element.
482    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
483    #[instrument(name = "catalog::transact_inner")]
484    async fn transact_inner(
485        storage_collections: Option<
486            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
487        >,
488        oracle_write_ts: mz_repr::Timestamp,
489        session: Option<&ConnMeta>,
490        mut ops: Vec<Op>,
491        temporary_ids: BTreeSet<CatalogItemId>,
492        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
493        audit_events: &mut Vec<VersionedEvent>,
494        tx: &mut Transaction<'_>,
495        state: &CatalogState,
496    ) -> Result<Option<CatalogState>, AdapterError> {
497        let mut state = Cow::Borrowed(state);
498
499        let dry_run_ops = match ops.last() {
500            Some(Op::TransactionDryRun) => {
501                // Remove dry run marker.
502                ops.pop();
503                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
504                ops.clone()
505            }
506            Some(_) => vec![],
507            None => return Ok(None),
508        };
509
510        let mut storage_collections_to_create = BTreeSet::new();
511        let mut storage_collections_to_drop = BTreeSet::new();
512        let mut storage_collections_to_register = BTreeMap::new();
513
514        for op in ops {
515            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
516                oracle_write_ts,
517                session,
518                op,
519                &temporary_ids,
520                audit_events,
521                tx,
522                &*state,
523                &mut storage_collections_to_create,
524                &mut storage_collections_to_drop,
525                &mut storage_collections_to_register,
526            )
527            .await?;
528
529            // Certain builtin tables are not derived from the durable catalog state, so they need
530            // to be updated ad-hoc based on the current transaction. This is weird and will not
531            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
532            // this instance crashes after committing the durable catalog but before applying the
533            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
534            // world. Currently, this works fine and there are no correctness issues because
535            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
536            // state of all builtin tables to the correct values.
537            if let Some(builtin_table_update) = weird_builtin_table_update {
538                builtin_table_updates.push(builtin_table_update);
539            }
540
541            // Temporary items are not stored in the durable catalog, so they need to be handled
542            // separately for updating state and builtin tables.
543            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
544            // in a multi-subscriber catalog world.
545            let op_id = tx.op_id().into();
546            let temporary_item_updates =
547                temporary_item_updates
548                    .into_iter()
549                    .map(|(item, diff)| StateUpdate {
550                        kind: StateUpdateKind::TemporaryItem(item),
551                        ts: op_id,
552                        diff,
553                    });
554
555            let mut updates: Vec<_> = tx.get_and_commit_op_updates();
556            updates.extend(temporary_item_updates);
557            if !updates.is_empty() {
558                let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
559                let op_builtin_table_updates = state
560                    .to_mut()
561                    .resolve_builtin_table_updates(op_builtin_table_updates);
562                builtin_table_updates.extend(op_builtin_table_updates);
563            }
564        }
565
566        if dry_run_ops.is_empty() {
567            // `storage_collections` should only be `None` for tests.
568            if let Some(c) = storage_collections {
569                c.prepare_state(
570                    tx,
571                    storage_collections_to_create,
572                    storage_collections_to_drop,
573                    storage_collections_to_register,
574                )
575                .await?;
576            }
577
578            let updates = tx.get_and_commit_op_updates();
579            if !updates.is_empty() {
580                let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
581                let op_builtin_table_updates = state
582                    .to_mut()
583                    .resolve_builtin_table_updates(op_builtin_table_updates);
584                builtin_table_updates.extend(op_builtin_table_updates);
585            }
586
587            match state {
588                Cow::Owned(state) => Ok(Some(state)),
589                Cow::Borrowed(_) => Ok(None),
590            }
591        } else {
592            Err(AdapterError::TransactionDryRun {
593                new_ops: dry_run_ops,
594                new_state: state.into_owned(),
595            })
596        }
597    }
598
599    /// Performs the transaction operation described by `op`. This function prepares the changes in
600    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
601    /// changes.
602    ///
603    /// Optionally returns a builtin table update for any builtin table updates than cannot be
604    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
605    /// scenarios and ideally in the future don't exist.
606    #[instrument]
607    async fn transact_op(
608        oracle_write_ts: mz_repr::Timestamp,
609        session: Option<&ConnMeta>,
610        op: Op,
611        temporary_ids: &BTreeSet<CatalogItemId>,
612        audit_events: &mut Vec<VersionedEvent>,
613        tx: &mut Transaction<'_>,
614        state: &CatalogState,
615        storage_collections_to_create: &mut BTreeSet<GlobalId>,
616        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
617        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
618    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
619        let mut weird_builtin_table_update = None;
620        let mut temporary_item_updates = Vec::new();
621
622        match op {
623            Op::TransactionDryRun => {
624                unreachable!("TransactionDryRun can only be used a final element of ops")
625            }
626            Op::AlterRetainHistory { id, value, window } => {
627                let entry = state.get_entry(&id);
628                if id.is_system() {
629                    let name = entry.name();
630                    let full_name =
631                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
632                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
633                        full_name.to_string(),
634                    ))));
635                }
636
637                let mut new_entry = entry.clone();
638                let previous = new_entry
639                    .item
640                    .update_retain_history(value.clone(), window)
641                    .map_err(|_| {
642                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
643                            "planner should have rejected invalid alter retain history item type"
644                                .to_string(),
645                        )))
646                    })?;
647
648                if Self::should_audit_log_item(new_entry.item()) {
649                    let details =
650                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
651                            id: id.to_string(),
652                            old_history: previous.map(|previous| previous.to_string()),
653                            new_history: value.map(|v| v.to_string()),
654                        });
655                    CatalogState::add_to_audit_log(
656                        &state.system_configuration,
657                        oracle_write_ts,
658                        session,
659                        tx,
660                        audit_events,
661                        EventType::Alter,
662                        catalog_type_to_audit_object_type(new_entry.item().typ()),
663                        details,
664                    )?;
665                }
666
667                tx.update_item(id, new_entry.into())?;
668
669                Self::log_update(state, &id);
670            }
671            Op::AlterRole {
672                id,
673                name,
674                attributes,
675                vars,
676            } => {
677                state.ensure_not_reserved_role(&id)?;
678
679                let mut existing_role = state.get_role(&id).clone();
680                existing_role.attributes = attributes;
681                existing_role.vars = vars;
682                tx.update_role(id, existing_role.into())?;
683
684                CatalogState::add_to_audit_log(
685                    &state.system_configuration,
686                    oracle_write_ts,
687                    session,
688                    tx,
689                    audit_events,
690                    EventType::Alter,
691                    ObjectType::Role,
692                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
693                        id: id.to_string(),
694                        name: name.clone(),
695                    }),
696                )?;
697
698                info!("update role {name} ({id})");
699            }
700            Op::AlterNetworkPolicy {
701                id,
702                rules,
703                name,
704                owner_id: _owner_id,
705            } => {
706                let existing_policy = state.get_network_policy(&id).clone();
707                let mut policy: NetworkPolicy = existing_policy.into();
708                policy.rules = rules;
709                if is_reserved_name(&name) {
710                    return Err(AdapterError::Catalog(Error::new(
711                        ErrorKind::ReservedNetworkPolicyName(name),
712                    )));
713                }
714                tx.update_network_policy(id, policy.clone())?;
715
716                CatalogState::add_to_audit_log(
717                    &state.system_configuration,
718                    oracle_write_ts,
719                    session,
720                    tx,
721                    audit_events,
722                    EventType::Alter,
723                    ObjectType::NetworkPolicy,
724                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
725                        id: id.to_string(),
726                        name: name.clone(),
727                    }),
728                )?;
729
730                info!("update network policy {name} ({id})");
731            }
732            Op::AlterAddColumn {
733                id,
734                new_global_id,
735                name,
736                typ,
737                sql,
738            } => {
739                let mut new_entry = state.get_entry(&id).clone();
740                let version = new_entry.item.add_column(name, typ, sql)?;
741                // All versions of a table share the same shard, so it shouldn't matter what
742                // GlobalId we use here.
743                let shard_id = state
744                    .storage_metadata()
745                    .get_collection_shard(new_entry.latest_global_id())?;
746
747                // TODO(alter_table): Support adding columns to sources.
748                let CatalogItem::Table(table) = &mut new_entry.item else {
749                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
750                };
751                table.collections.insert(version, new_global_id);
752
753                tx.update_item(id, new_entry.into())?;
754                storage_collections_to_register.insert(new_global_id, shard_id);
755            }
756            Op::CreateDatabase { name, owner_id } => {
757                let database_owner_privileges = vec![rbac::owner_privilege(
758                    mz_sql::catalog::ObjectType::Database,
759                    owner_id,
760                )];
761                let database_default_privileges = state
762                    .default_privileges
763                    .get_applicable_privileges(
764                        owner_id,
765                        None,
766                        None,
767                        mz_sql::catalog::ObjectType::Database,
768                    )
769                    .map(|item| item.mz_acl_item(owner_id));
770                let database_privileges: Vec<_> = merge_mz_acl_items(
771                    database_owner_privileges
772                        .into_iter()
773                        .chain(database_default_privileges),
774                )
775                .collect();
776
777                let schema_owner_privileges = vec![rbac::owner_privilege(
778                    mz_sql::catalog::ObjectType::Schema,
779                    owner_id,
780                )];
781                let schema_default_privileges = state
782                    .default_privileges
783                    .get_applicable_privileges(
784                        owner_id,
785                        None,
786                        None,
787                        mz_sql::catalog::ObjectType::Schema,
788                    )
789                    .map(|item| item.mz_acl_item(owner_id))
790                    // Special default privilege on public schemas.
791                    .chain(std::iter::once(MzAclItem {
792                        grantee: RoleId::Public,
793                        grantor: owner_id,
794                        acl_mode: AclMode::USAGE,
795                    }));
796                let schema_privileges: Vec<_> = merge_mz_acl_items(
797                    schema_owner_privileges
798                        .into_iter()
799                        .chain(schema_default_privileges),
800                )
801                .collect();
802
803                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
804                let (database_id, _) = tx.insert_user_database(
805                    &name,
806                    owner_id,
807                    database_privileges.clone(),
808                    &temporary_oids,
809                )?;
810                let (schema_id, _) = tx.insert_user_schema(
811                    database_id,
812                    DEFAULT_SCHEMA,
813                    owner_id,
814                    schema_privileges.clone(),
815                    &temporary_oids,
816                )?;
817                CatalogState::add_to_audit_log(
818                    &state.system_configuration,
819                    oracle_write_ts,
820                    session,
821                    tx,
822                    audit_events,
823                    EventType::Create,
824                    ObjectType::Database,
825                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
826                        id: database_id.to_string(),
827                        name: name.clone(),
828                    }),
829                )?;
830                info!("create database {}", name);
831
832                CatalogState::add_to_audit_log(
833                    &state.system_configuration,
834                    oracle_write_ts,
835                    session,
836                    tx,
837                    audit_events,
838                    EventType::Create,
839                    ObjectType::Schema,
840                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
841                        id: schema_id.to_string(),
842                        name: DEFAULT_SCHEMA.to_string(),
843                        database_name: Some(name),
844                    }),
845                )?;
846            }
847            Op::CreateSchema {
848                database_id,
849                schema_name,
850                owner_id,
851            } => {
852                if is_reserved_name(&schema_name) {
853                    return Err(AdapterError::Catalog(Error::new(
854                        ErrorKind::ReservedSchemaName(schema_name),
855                    )));
856                }
857                let database_id = match database_id {
858                    ResolvedDatabaseSpecifier::Id(id) => id,
859                    ResolvedDatabaseSpecifier::Ambient => {
860                        return Err(AdapterError::Catalog(Error::new(
861                            ErrorKind::ReadOnlySystemSchema(schema_name),
862                        )));
863                    }
864                };
865                let owner_privileges = vec![rbac::owner_privilege(
866                    mz_sql::catalog::ObjectType::Schema,
867                    owner_id,
868                )];
869                let default_privileges = state
870                    .default_privileges
871                    .get_applicable_privileges(
872                        owner_id,
873                        Some(database_id),
874                        None,
875                        mz_sql::catalog::ObjectType::Schema,
876                    )
877                    .map(|item| item.mz_acl_item(owner_id));
878                let privileges: Vec<_> =
879                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
880                        .collect();
881                let (schema_id, _) = tx.insert_user_schema(
882                    database_id,
883                    &schema_name,
884                    owner_id,
885                    privileges.clone(),
886                    &state.get_temporary_oids().collect(),
887                )?;
888                CatalogState::add_to_audit_log(
889                    &state.system_configuration,
890                    oracle_write_ts,
891                    session,
892                    tx,
893                    audit_events,
894                    EventType::Create,
895                    ObjectType::Schema,
896                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
897                        id: schema_id.to_string(),
898                        name: schema_name.clone(),
899                        database_name: Some(state.database_by_id[&database_id].name.clone()),
900                    }),
901                )?;
902            }
903            Op::CreateRole { name, attributes } => {
904                if is_reserved_role_name(&name) {
905                    return Err(AdapterError::Catalog(Error::new(
906                        ErrorKind::ReservedRoleName(name),
907                    )));
908                }
909                let membership = RoleMembership::new();
910                let vars = RoleVars::default();
911                let (id, _) = tx.insert_user_role(
912                    name.clone(),
913                    attributes.clone(),
914                    membership.clone(),
915                    vars.clone(),
916                    &state.get_temporary_oids().collect(),
917                )?;
918                CatalogState::add_to_audit_log(
919                    &state.system_configuration,
920                    oracle_write_ts,
921                    session,
922                    tx,
923                    audit_events,
924                    EventType::Create,
925                    ObjectType::Role,
926                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
927                        id: id.to_string(),
928                        name: name.clone(),
929                    }),
930                )?;
931                info!("create role {}", name);
932            }
933            Op::CreateCluster {
934                id,
935                name,
936                introspection_sources,
937                owner_id,
938                config,
939            } => {
940                if is_reserved_name(&name) {
941                    return Err(AdapterError::Catalog(Error::new(
942                        ErrorKind::ReservedClusterName(name),
943                    )));
944                }
945                let owner_privileges = vec![rbac::owner_privilege(
946                    mz_sql::catalog::ObjectType::Cluster,
947                    owner_id,
948                )];
949                let default_privileges = state
950                    .default_privileges
951                    .get_applicable_privileges(
952                        owner_id,
953                        None,
954                        None,
955                        mz_sql::catalog::ObjectType::Cluster,
956                    )
957                    .map(|item| item.mz_acl_item(owner_id));
958                let privileges: Vec<_> =
959                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
960                        .collect();
961                let introspection_source_ids: Vec<_> = introspection_sources
962                    .iter()
963                    .map(|introspection_source| {
964                        Transaction::allocate_introspection_source_index_id(
965                            &id,
966                            introspection_source.variant,
967                        )
968                    })
969                    .collect();
970
971                let introspection_sources = introspection_sources
972                    .into_iter()
973                    .zip_eq(introspection_source_ids)
974                    .map(|(log, (item_id, gid))| (log, item_id, gid))
975                    .collect();
976
977                tx.insert_user_cluster(
978                    id,
979                    &name,
980                    introspection_sources,
981                    owner_id,
982                    privileges.clone(),
983                    config.clone().into(),
984                    &state.get_temporary_oids().collect(),
985                )?;
986                CatalogState::add_to_audit_log(
987                    &state.system_configuration,
988                    oracle_write_ts,
989                    session,
990                    tx,
991                    audit_events,
992                    EventType::Create,
993                    ObjectType::Cluster,
994                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
995                        id: id.to_string(),
996                        name: name.clone(),
997                    }),
998                )?;
999                info!("create cluster {}", name);
1000            }
1001            Op::CreateClusterReplica {
1002                cluster_id,
1003                name,
1004                config,
1005                owner_id,
1006                reason,
1007            } => {
1008                if is_reserved_name(&name) {
1009                    return Err(AdapterError::Catalog(Error::new(
1010                        ErrorKind::ReservedReplicaName(name),
1011                    )));
1012                }
1013                let cluster = state.get_cluster(cluster_id);
1014                let id =
1015                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1016                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1017                    size,
1018                    billed_as,
1019                    internal,
1020                    ..
1021                }) = &config.location
1022                {
1023                    let (reason, scheduling_policies) = reason.into_audit_log();
1024                    let details = EventDetails::CreateClusterReplicaV4(
1025                        mz_audit_log::CreateClusterReplicaV4 {
1026                            cluster_id: cluster_id.to_string(),
1027                            cluster_name: cluster.name.clone(),
1028                            replica_id: Some(id.to_string()),
1029                            replica_name: name.clone(),
1030                            logical_size: size.clone(),
1031                            billed_as: billed_as.clone(),
1032                            internal: *internal,
1033                            reason,
1034                            scheduling_policies,
1035                        },
1036                    );
1037                    CatalogState::add_to_audit_log(
1038                        &state.system_configuration,
1039                        oracle_write_ts,
1040                        session,
1041                        tx,
1042                        audit_events,
1043                        EventType::Create,
1044                        ObjectType::ClusterReplica,
1045                        details,
1046                    )?;
1047                }
1048            }
1049            Op::CreateItem {
1050                id,
1051                name,
1052                item,
1053                owner_id,
1054            } => {
1055                state.check_unstable_dependencies(&item)?;
1056
1057                match &item {
1058                    CatalogItem::Table(table) => {
1059                        let gids: Vec<_> = table.global_ids().collect();
1060                        assert_eq!(gids.len(), 1);
1061                        storage_collections_to_create.extend(gids);
1062                    }
1063                    CatalogItem::Source(source) => {
1064                        storage_collections_to_create.insert(source.global_id());
1065                    }
1066                    CatalogItem::MaterializedView(mv) => {
1067                        storage_collections_to_create.insert(mv.global_id());
1068                    }
1069                    CatalogItem::ContinualTask(ct) => {
1070                        storage_collections_to_create.insert(ct.global_id());
1071                    }
1072                    CatalogItem::Sink(sink) => {
1073                        storage_collections_to_create.insert(sink.global_id());
1074                    }
1075                    CatalogItem::Log(_)
1076                    | CatalogItem::View(_)
1077                    | CatalogItem::Index(_)
1078                    | CatalogItem::Type(_)
1079                    | CatalogItem::Func(_)
1080                    | CatalogItem::Secret(_)
1081                    | CatalogItem::Connection(_) => (),
1082                }
1083
1084                let system_user = session.map_or(false, |s| s.user().is_system_user());
1085                if !system_user {
1086                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1087                        let cluster_name = state.clusters_by_id[&id].name.clone();
1088                        return Err(AdapterError::Catalog(Error::new(
1089                            ErrorKind::ReadOnlyCluster(cluster_name),
1090                        )));
1091                    }
1092                }
1093
1094                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1095                let default_privileges = state
1096                    .default_privileges
1097                    .get_applicable_privileges(
1098                        owner_id,
1099                        name.qualifiers.database_spec.id(),
1100                        Some(name.qualifiers.schema_spec.into()),
1101                        item.typ().into(),
1102                    )
1103                    .map(|item| item.mz_acl_item(owner_id));
1104                // mz_support can read all progress sources.
1105                let progress_source_privilege = if item.is_progress_source() {
1106                    Some(MzAclItem {
1107                        grantee: MZ_SUPPORT_ROLE_ID,
1108                        grantor: owner_id,
1109                        acl_mode: AclMode::SELECT,
1110                    })
1111                } else {
1112                    None
1113                };
1114                let privileges: Vec<_> = merge_mz_acl_items(
1115                    owner_privileges
1116                        .into_iter()
1117                        .chain(default_privileges)
1118                        .chain(progress_source_privilege),
1119                )
1120                .collect();
1121
1122                let temporary_oids = state.get_temporary_oids().collect();
1123
1124                if item.is_temporary() {
1125                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1126                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1127                    {
1128                        return Err(AdapterError::Catalog(Error::new(
1129                            ErrorKind::InvalidTemporarySchema,
1130                        )));
1131                    }
1132                    let oid = tx.allocate_oid(&temporary_oids)?;
1133                    let item = TemporaryItem {
1134                        id,
1135                        oid,
1136                        name: name.clone(),
1137                        item: item.clone(),
1138                        owner_id,
1139                        privileges: PrivilegeMap::from_mz_acl_items(privileges),
1140                    };
1141                    temporary_item_updates.push((item, StateDiff::Addition));
1142                } else {
1143                    if let Some(temp_id) =
1144                        item.uses()
1145                            .iter()
1146                            .find(|id| match state.try_get_entry(*id) {
1147                                Some(entry) => entry.item().is_temporary(),
1148                                None => temporary_ids.contains(id),
1149                            })
1150                    {
1151                        let temp_item = state.get_entry(temp_id);
1152                        return Err(AdapterError::Catalog(Error::new(
1153                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1154                        )));
1155                    }
1156                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1157                        && !system_user
1158                    {
1159                        let schema_name = state
1160                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1161                            .schema;
1162                        return Err(AdapterError::Catalog(Error::new(
1163                            ErrorKind::ReadOnlySystemSchema(schema_name),
1164                        )));
1165                    }
1166                    let schema_id = name.qualifiers.schema_spec.clone().into();
1167                    let item_type = item.typ();
1168                    let (create_sql, global_id, versions) = item.to_serialized();
1169                    tx.insert_user_item(
1170                        id,
1171                        global_id,
1172                        schema_id,
1173                        &name.item,
1174                        create_sql,
1175                        owner_id,
1176                        privileges.clone(),
1177                        &temporary_oids,
1178                        versions,
1179                    )?;
1180                    info!(
1181                        "create {} {} ({})",
1182                        item_type,
1183                        state.resolve_full_name(&name, None),
1184                        id
1185                    );
1186                }
1187
1188                if Self::should_audit_log_item(&item) {
1189                    let name = Self::full_name_detail(
1190                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1191                    );
1192                    let details = match &item {
1193                        CatalogItem::Source(s) => {
1194                            let cluster_id = match s.data_source {
1195                                // Ingestion exports don't have their own cluster, but
1196                                // run on their ingestion's cluster.
1197                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1198                                    match state.get_entry(&ingestion_id).cluster_id() {
1199                                        Some(cluster_id) => Some(cluster_id.to_string()),
1200                                        None => None,
1201                                    }
1202                                }
1203                                _ => match item.cluster_id() {
1204                                    Some(cluster_id) => Some(cluster_id.to_string()),
1205                                    None => None,
1206                                },
1207                            };
1208
1209                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1210                                id: id.to_string(),
1211                                cluster_id,
1212                                name,
1213                                external_type: s.source_type().to_string(),
1214                            })
1215                        }
1216                        CatalogItem::Sink(s) => {
1217                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1218                                id: id.to_string(),
1219                                cluster_id: Some(s.cluster_id.to_string()),
1220                                name,
1221                                external_type: s.sink_type().to_string(),
1222                            })
1223                        }
1224                        CatalogItem::Index(i) => {
1225                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1226                                id: id.to_string(),
1227                                name,
1228                                cluster_id: i.cluster_id.to_string(),
1229                            })
1230                        }
1231                        CatalogItem::MaterializedView(mv) => {
1232                            EventDetails::CreateMaterializedViewV1(
1233                                mz_audit_log::CreateMaterializedViewV1 {
1234                                    id: id.to_string(),
1235                                    name,
1236                                    cluster_id: mv.cluster_id.to_string(),
1237                                },
1238                            )
1239                        }
1240                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1241                            id: id.to_string(),
1242                            name,
1243                        }),
1244                    };
1245                    CatalogState::add_to_audit_log(
1246                        &state.system_configuration,
1247                        oracle_write_ts,
1248                        session,
1249                        tx,
1250                        audit_events,
1251                        EventType::Create,
1252                        catalog_type_to_audit_object_type(item.typ()),
1253                        details,
1254                    )?;
1255                }
1256            }
1257            Op::CreateNetworkPolicy {
1258                rules,
1259                name,
1260                owner_id,
1261            } => {
1262                if state.network_policies_by_name.contains_key(&name) {
1263                    return Err(AdapterError::PlanError(PlanError::Catalog(
1264                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1265                    )));
1266                }
1267                if is_reserved_name(&name) {
1268                    return Err(AdapterError::Catalog(Error::new(
1269                        ErrorKind::ReservedNetworkPolicyName(name),
1270                    )));
1271                }
1272
1273                let owner_privileges = vec![rbac::owner_privilege(
1274                    mz_sql::catalog::ObjectType::NetworkPolicy,
1275                    owner_id,
1276                )];
1277                let default_privileges = state
1278                    .default_privileges
1279                    .get_applicable_privileges(
1280                        owner_id,
1281                        None,
1282                        None,
1283                        mz_sql::catalog::ObjectType::NetworkPolicy,
1284                    )
1285                    .map(|item| item.mz_acl_item(owner_id));
1286                let privileges: Vec<_> =
1287                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1288                        .collect();
1289
1290                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1291                let id = tx.insert_user_network_policy(
1292                    name.clone(),
1293                    rules,
1294                    privileges,
1295                    owner_id,
1296                    &temporary_oids,
1297                )?;
1298
1299                CatalogState::add_to_audit_log(
1300                    &state.system_configuration,
1301                    oracle_write_ts,
1302                    session,
1303                    tx,
1304                    audit_events,
1305                    EventType::Create,
1306                    ObjectType::NetworkPolicy,
1307                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1308                        id: id.to_string(),
1309                        name: name.clone(),
1310                    }),
1311                )?;
1312
1313                info!("created network policy {name} ({id})");
1314            }
1315            Op::Comment {
1316                object_id,
1317                sub_component,
1318                comment,
1319            } => {
1320                tx.update_comment(object_id, sub_component, comment)?;
1321                let entry = state.get_comment_id_entry(&object_id);
1322                let should_log = entry
1323                    .map(|entry| Self::should_audit_log_item(entry.item()))
1324                    // Things that aren't catalog entries can't be temp, so should be logged.
1325                    .unwrap_or(true);
1326                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1327                // comments won't be logged for now.
1328                if let (Some(conn_id), true) =
1329                    (session.map(|session| session.conn_id()), should_log)
1330                {
1331                    CatalogState::add_to_audit_log(
1332                        &state.system_configuration,
1333                        oracle_write_ts,
1334                        session,
1335                        tx,
1336                        audit_events,
1337                        EventType::Comment,
1338                        comment_id_to_audit_object_type(object_id),
1339                        EventDetails::IdNameV1(IdNameV1 {
1340                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1341                            id: format!("{object_id:?}"),
1342                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1343                        }),
1344                    )?;
1345                }
1346            }
1347            Op::UpdateSourceReferences {
1348                source_id,
1349                references,
1350            } => {
1351                tx.update_source_references(
1352                    source_id,
1353                    references
1354                        .references
1355                        .into_iter()
1356                        .map(|reference| reference.into())
1357                        .collect(),
1358                    references.updated_at,
1359                )?;
1360            }
1361            Op::DropObjects(drop_object_infos) => {
1362                // Generate all of the objects that need to get dropped.
1363                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1364
1365                // Drop any associated comments.
1366                tx.drop_comments(&delta.comments)?;
1367
1368                // Drop any items.
1369                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1370                    delta
1371                        .items
1372                        .iter()
1373                        .map(|id| id)
1374                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1375                tx.remove_items(&durable_items_to_drop)?;
1376                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1377                    let entry = state.get_entry(&id);
1378                    (entry.clone().into(), StateDiff::Retraction)
1379                }));
1380
1381                for item_id in delta.items {
1382                    let entry = state.get_entry(&item_id);
1383
1384                    if entry.item().is_storage_collection() {
1385                        storage_collections_to_drop.extend(entry.global_ids());
1386                    }
1387
1388                    if state.source_references.contains_key(&item_id) {
1389                        tx.remove_source_references(item_id)?;
1390                    }
1391
1392                    if Self::should_audit_log_item(entry.item()) {
1393                        CatalogState::add_to_audit_log(
1394                            &state.system_configuration,
1395                            oracle_write_ts,
1396                            session,
1397                            tx,
1398                            audit_events,
1399                            EventType::Drop,
1400                            catalog_type_to_audit_object_type(entry.item().typ()),
1401                            EventDetails::IdFullNameV1(IdFullNameV1 {
1402                                id: item_id.to_string(),
1403                                name: Self::full_name_detail(&state.resolve_full_name(
1404                                    entry.name(),
1405                                    session.map(|session| session.conn_id()),
1406                                )),
1407                            }),
1408                        )?;
1409                    }
1410                    info!(
1411                        "drop {} {} ({})",
1412                        entry.item_type(),
1413                        state.resolve_full_name(entry.name(), entry.conn_id()),
1414                        item_id
1415                    );
1416                }
1417
1418                // Drop any schemas.
1419                let schemas = delta
1420                    .schemas
1421                    .iter()
1422                    .map(|(schema_spec, database_spec)| {
1423                        (SchemaId::from(schema_spec), *database_spec)
1424                    })
1425                    .collect();
1426                tx.remove_schemas(&schemas)?;
1427
1428                for (schema_spec, database_spec) in delta.schemas {
1429                    let schema = state.get_schema(
1430                        &database_spec,
1431                        &schema_spec,
1432                        session
1433                            .map(|session| session.conn_id())
1434                            .unwrap_or(&SYSTEM_CONN_ID),
1435                    );
1436
1437                    let schema_id = SchemaId::from(schema_spec);
1438                    let database_id = match database_spec {
1439                        ResolvedDatabaseSpecifier::Ambient => None,
1440                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1441                    };
1442
1443                    CatalogState::add_to_audit_log(
1444                        &state.system_configuration,
1445                        oracle_write_ts,
1446                        session,
1447                        tx,
1448                        audit_events,
1449                        EventType::Drop,
1450                        ObjectType::Schema,
1451                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1452                            id: schema_id.to_string(),
1453                            name: schema.name.schema.to_string(),
1454                            database_name: database_id
1455                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1456                        }),
1457                    )?;
1458                }
1459
1460                // Drop any databases.
1461                tx.remove_databases(&delta.databases)?;
1462
1463                for database_id in delta.databases {
1464                    let database = state.get_database(&database_id).clone();
1465
1466                    CatalogState::add_to_audit_log(
1467                        &state.system_configuration,
1468                        oracle_write_ts,
1469                        session,
1470                        tx,
1471                        audit_events,
1472                        EventType::Drop,
1473                        ObjectType::Database,
1474                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1475                            id: database_id.to_string(),
1476                            name: database.name.clone(),
1477                        }),
1478                    )?;
1479                }
1480
1481                // Drop any roles.
1482                tx.remove_user_roles(&delta.roles)?;
1483
1484                for role_id in delta.roles {
1485                    let role = state
1486                        .roles_by_id
1487                        .get(&role_id)
1488                        .expect("catalog out of sync");
1489
1490                    CatalogState::add_to_audit_log(
1491                        &state.system_configuration,
1492                        oracle_write_ts,
1493                        session,
1494                        tx,
1495                        audit_events,
1496                        EventType::Drop,
1497                        ObjectType::Role,
1498                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1499                            id: role.id.to_string(),
1500                            name: role.name.clone(),
1501                        }),
1502                    )?;
1503                    info!("drop role {}", role.name());
1504                }
1505
1506                // Drop any network policies.
1507                tx.remove_network_policies(&delta.network_policies)?;
1508
1509                for network_policy_id in delta.network_policies {
1510                    let policy = state
1511                        .network_policies_by_id
1512                        .get(&network_policy_id)
1513                        .expect("catalog out of sync");
1514
1515                    CatalogState::add_to_audit_log(
1516                        &state.system_configuration,
1517                        oracle_write_ts,
1518                        session,
1519                        tx,
1520                        audit_events,
1521                        EventType::Drop,
1522                        ObjectType::NetworkPolicy,
1523                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1524                            id: policy.id.to_string(),
1525                            name: policy.name.clone(),
1526                        }),
1527                    )?;
1528                    info!("drop network policy {}", policy.name.clone());
1529                }
1530
1531                // Drop any replicas.
1532                let replicas = delta.replicas.keys().copied().collect();
1533                tx.remove_cluster_replicas(&replicas)?;
1534
1535                for (replica_id, (cluster_id, reason)) in delta.replicas {
1536                    let cluster = state.get_cluster(cluster_id);
1537                    let replica = cluster.replica(replica_id).expect("Must exist");
1538
1539                    let (reason, scheduling_policies) = reason.into_audit_log();
1540                    let details =
1541                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1542                            cluster_id: cluster_id.to_string(),
1543                            cluster_name: cluster.name.clone(),
1544                            replica_id: Some(replica_id.to_string()),
1545                            replica_name: replica.name.clone(),
1546                            reason,
1547                            scheduling_policies,
1548                        });
1549                    CatalogState::add_to_audit_log(
1550                        &state.system_configuration,
1551                        oracle_write_ts,
1552                        session,
1553                        tx,
1554                        audit_events,
1555                        EventType::Drop,
1556                        ObjectType::ClusterReplica,
1557                        details,
1558                    )?;
1559                }
1560
1561                // Drop any clusters.
1562                tx.remove_clusters(&delta.clusters)?;
1563
1564                for cluster_id in delta.clusters {
1565                    let cluster = state.get_cluster(cluster_id);
1566
1567                    CatalogState::add_to_audit_log(
1568                        &state.system_configuration,
1569                        oracle_write_ts,
1570                        session,
1571                        tx,
1572                        audit_events,
1573                        EventType::Drop,
1574                        ObjectType::Cluster,
1575                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1576                            id: cluster.id.to_string(),
1577                            name: cluster.name.clone(),
1578                        }),
1579                    )?;
1580                }
1581            }
1582            Op::GrantRole {
1583                role_id,
1584                member_id,
1585                grantor_id,
1586            } => {
1587                state.ensure_not_reserved_role(&member_id)?;
1588                state.ensure_grantable_role(&role_id)?;
1589                if state.collect_role_membership(&role_id).contains(&member_id) {
1590                    let group_role = state.get_role(&role_id);
1591                    let member_role = state.get_role(&member_id);
1592                    return Err(AdapterError::Catalog(Error::new(
1593                        ErrorKind::CircularRoleMembership {
1594                            role_name: group_role.name().to_string(),
1595                            member_name: member_role.name().to_string(),
1596                        },
1597                    )));
1598                }
1599                let mut member_role = state.get_role(&member_id).clone();
1600                member_role.membership.map.insert(role_id, grantor_id);
1601                tx.update_role(member_id, member_role.into())?;
1602
1603                CatalogState::add_to_audit_log(
1604                    &state.system_configuration,
1605                    oracle_write_ts,
1606                    session,
1607                    tx,
1608                    audit_events,
1609                    EventType::Grant,
1610                    ObjectType::Role,
1611                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1612                        role_id: role_id.to_string(),
1613                        member_id: member_id.to_string(),
1614                        grantor_id: grantor_id.to_string(),
1615                        executed_by: session
1616                            .map(|session| session.authenticated_role_id())
1617                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1618                            .to_string(),
1619                    }),
1620                )?;
1621            }
1622            Op::RevokeRole {
1623                role_id,
1624                member_id,
1625                grantor_id,
1626            } => {
1627                state.ensure_not_reserved_role(&member_id)?;
1628                state.ensure_grantable_role(&role_id)?;
1629                let mut member_role = state.get_role(&member_id).clone();
1630                member_role.membership.map.remove(&role_id);
1631                tx.update_role(member_id, member_role.into())?;
1632
1633                CatalogState::add_to_audit_log(
1634                    &state.system_configuration,
1635                    oracle_write_ts,
1636                    session,
1637                    tx,
1638                    audit_events,
1639                    EventType::Revoke,
1640                    ObjectType::Role,
1641                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1642                        role_id: role_id.to_string(),
1643                        member_id: member_id.to_string(),
1644                        grantor_id: grantor_id.to_string(),
1645                        executed_by: session
1646                            .map(|session| session.authenticated_role_id())
1647                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1648                            .to_string(),
1649                    }),
1650                )?;
1651            }
1652            Op::UpdatePrivilege {
1653                target_id,
1654                privilege,
1655                variant,
1656            } => {
1657                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1658                    UpdatePrivilegeVariant::Grant => {
1659                        privileges.grant(privilege);
1660                    }
1661                    UpdatePrivilegeVariant::Revoke => {
1662                        privileges.revoke(&privilege);
1663                    }
1664                };
1665                match &target_id {
1666                    SystemObjectId::Object(object_id) => match object_id {
1667                        ObjectId::Cluster(id) => {
1668                            let mut cluster = state.get_cluster(*id).clone();
1669                            update_privilege_fn(&mut cluster.privileges);
1670                            tx.update_cluster(*id, cluster.into())?;
1671                        }
1672                        ObjectId::Database(id) => {
1673                            let mut database = state.get_database(id).clone();
1674                            update_privilege_fn(&mut database.privileges);
1675                            tx.update_database(*id, database.into())?;
1676                        }
1677                        ObjectId::NetworkPolicy(id) => {
1678                            let mut policy = state.get_network_policy(id).clone();
1679                            update_privilege_fn(&mut policy.privileges);
1680                            tx.update_network_policy(*id, policy.into())?;
1681                        }
1682                        ObjectId::Schema((database_spec, schema_spec)) => {
1683                            let schema_id = schema_spec.clone().into();
1684                            let mut schema = state
1685                                .get_schema(
1686                                    database_spec,
1687                                    schema_spec,
1688                                    session
1689                                        .map(|session| session.conn_id())
1690                                        .unwrap_or(&SYSTEM_CONN_ID),
1691                                )
1692                                .clone();
1693                            update_privilege_fn(&mut schema.privileges);
1694                            tx.update_schema(schema_id, schema.into())?;
1695                        }
1696                        ObjectId::Item(id) => {
1697                            let entry = state.get_entry(id);
1698                            let mut new_entry = entry.clone();
1699                            update_privilege_fn(&mut new_entry.privileges);
1700                            if !new_entry.item().is_temporary() {
1701                                tx.update_item(*id, new_entry.into())?;
1702                            } else {
1703                                temporary_item_updates
1704                                    .push((entry.clone().into(), StateDiff::Retraction));
1705                                temporary_item_updates
1706                                    .push((new_entry.into(), StateDiff::Addition));
1707                            }
1708                        }
1709                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1710                    },
1711                    SystemObjectId::System => {
1712                        let mut system_privileges = state.system_privileges.clone();
1713                        update_privilege_fn(&mut system_privileges);
1714                        let new_privilege =
1715                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1716                        tx.set_system_privilege(
1717                            privilege.grantee,
1718                            privilege.grantor,
1719                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1720                        )?;
1721                    }
1722                }
1723                let object_type = state.get_system_object_type(&target_id);
1724                let object_id_str = match &target_id {
1725                    SystemObjectId::System => "SYSTEM".to_string(),
1726                    SystemObjectId::Object(id) => id.to_string(),
1727                };
1728                CatalogState::add_to_audit_log(
1729                    &state.system_configuration,
1730                    oracle_write_ts,
1731                    session,
1732                    tx,
1733                    audit_events,
1734                    variant.into(),
1735                    system_object_type_to_audit_object_type(&object_type),
1736                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1737                        object_id: object_id_str,
1738                        grantee_id: privilege.grantee.to_string(),
1739                        grantor_id: privilege.grantor.to_string(),
1740                        privileges: privilege.acl_mode.to_string(),
1741                    }),
1742                )?;
1743            }
1744            Op::UpdateDefaultPrivilege {
1745                privilege_object,
1746                privilege_acl_item,
1747                variant,
1748            } => {
1749                let mut default_privileges = state.default_privileges.clone();
1750                match variant {
1751                    UpdatePrivilegeVariant::Grant => default_privileges
1752                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1753                    UpdatePrivilegeVariant::Revoke => {
1754                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1755                    }
1756                }
1757                let new_acl_mode = default_privileges
1758                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1759                tx.set_default_privilege(
1760                    privilege_object.role_id,
1761                    privilege_object.database_id,
1762                    privilege_object.schema_id,
1763                    privilege_object.object_type,
1764                    privilege_acl_item.grantee,
1765                    new_acl_mode.cloned(),
1766                )?;
1767                CatalogState::add_to_audit_log(
1768                    &state.system_configuration,
1769                    oracle_write_ts,
1770                    session,
1771                    tx,
1772                    audit_events,
1773                    variant.into(),
1774                    object_type_to_audit_object_type(privilege_object.object_type),
1775                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1776                        role_id: privilege_object.role_id.to_string(),
1777                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1778                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1779                        grantee_id: privilege_acl_item.grantee.to_string(),
1780                        privileges: privilege_acl_item.acl_mode.to_string(),
1781                    }),
1782                )?;
1783            }
1784            Op::RenameCluster {
1785                id,
1786                name,
1787                to_name,
1788                check_reserved_names,
1789            } => {
1790                if id.is_system() {
1791                    return Err(AdapterError::Catalog(Error::new(
1792                        ErrorKind::ReadOnlyCluster(name.clone()),
1793                    )));
1794                }
1795                if check_reserved_names && is_reserved_name(&to_name) {
1796                    return Err(AdapterError::Catalog(Error::new(
1797                        ErrorKind::ReservedClusterName(to_name),
1798                    )));
1799                }
1800                tx.rename_cluster(id, &name, &to_name)?;
1801                CatalogState::add_to_audit_log(
1802                    &state.system_configuration,
1803                    oracle_write_ts,
1804                    session,
1805                    tx,
1806                    audit_events,
1807                    EventType::Alter,
1808                    ObjectType::Cluster,
1809                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1810                        id: id.to_string(),
1811                        old_name: name.clone(),
1812                        new_name: to_name.clone(),
1813                    }),
1814                )?;
1815                info!("rename cluster {name} to {to_name}");
1816            }
1817            Op::RenameClusterReplica {
1818                cluster_id,
1819                replica_id,
1820                name,
1821                to_name,
1822            } => {
1823                if is_reserved_name(&to_name) {
1824                    return Err(AdapterError::Catalog(Error::new(
1825                        ErrorKind::ReservedReplicaName(to_name),
1826                    )));
1827                }
1828                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1829                CatalogState::add_to_audit_log(
1830                    &state.system_configuration,
1831                    oracle_write_ts,
1832                    session,
1833                    tx,
1834                    audit_events,
1835                    EventType::Alter,
1836                    ObjectType::ClusterReplica,
1837                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1838                        cluster_id: cluster_id.to_string(),
1839                        replica_id: replica_id.to_string(),
1840                        old_name: name.replica.as_str().to_string(),
1841                        new_name: to_name.clone(),
1842                    }),
1843                )?;
1844                info!("rename cluster replica {name} to {to_name}");
1845            }
1846            Op::RenameItem {
1847                id,
1848                to_name,
1849                current_full_name,
1850            } => {
1851                let mut updates = Vec::new();
1852
1853                let entry = state.get_entry(&id);
1854                if let CatalogItem::Type(_) = entry.item() {
1855                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1856                        current_full_name.to_string(),
1857                    ))));
1858                }
1859
1860                if entry.id().is_system() {
1861                    let name = state
1862                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1863                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1864                        name.to_string(),
1865                    ))));
1866                }
1867
1868                let mut to_full_name = current_full_name.clone();
1869                to_full_name.item.clone_from(&to_name);
1870
1871                let mut to_qualified_name = entry.name().clone();
1872                to_qualified_name.item.clone_from(&to_name);
1873
1874                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1875                    id: id.to_string(),
1876                    old_name: Self::full_name_detail(&current_full_name),
1877                    new_name: Self::full_name_detail(&to_full_name),
1878                });
1879                if Self::should_audit_log_item(entry.item()) {
1880                    CatalogState::add_to_audit_log(
1881                        &state.system_configuration,
1882                        oracle_write_ts,
1883                        session,
1884                        tx,
1885                        audit_events,
1886                        EventType::Alter,
1887                        catalog_type_to_audit_object_type(entry.item().typ()),
1888                        details,
1889                    )?;
1890                }
1891
1892                // Rename item itself.
1893                let mut new_entry = entry.clone();
1894                new_entry.name.item.clone_from(&to_name);
1895                new_entry.item = entry
1896                    .item()
1897                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1898                    .map_err(|e| {
1899                        Error::new(ErrorKind::from(AmbiguousRename {
1900                            depender: state
1901                                .resolve_full_name(entry.name(), entry.conn_id())
1902                                .to_string(),
1903                            dependee: state
1904                                .resolve_full_name(entry.name(), entry.conn_id())
1905                                .to_string(),
1906                            message: e,
1907                        }))
1908                    })?;
1909
1910                for id in entry.referenced_by() {
1911                    let dependent_item = state.get_entry(id);
1912                    let mut to_entry = dependent_item.clone();
1913                    to_entry.item = dependent_item
1914                        .item()
1915                        .rename_item_refs(
1916                            current_full_name.clone(),
1917                            to_full_name.item.clone(),
1918                            false,
1919                        )
1920                        .map_err(|e| {
1921                            Error::new(ErrorKind::from(AmbiguousRename {
1922                                depender: state
1923                                    .resolve_full_name(
1924                                        dependent_item.name(),
1925                                        dependent_item.conn_id(),
1926                                    )
1927                                    .to_string(),
1928                                dependee: state
1929                                    .resolve_full_name(entry.name(), entry.conn_id())
1930                                    .to_string(),
1931                                message: e,
1932                            }))
1933                        })?;
1934
1935                    if !to_entry.item().is_temporary() {
1936                        tx.update_item(*id, to_entry.into())?;
1937                    } else {
1938                        temporary_item_updates
1939                            .push((dependent_item.clone().into(), StateDiff::Retraction));
1940                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1941                    }
1942                    updates.push(*id);
1943                }
1944                if !new_entry.item().is_temporary() {
1945                    tx.update_item(id, new_entry.into())?;
1946                } else {
1947                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1948                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1949                }
1950
1951                updates.push(id);
1952                for id in updates {
1953                    Self::log_update(state, &id);
1954                }
1955            }
1956            Op::RenameSchema {
1957                database_spec,
1958                schema_spec,
1959                new_name,
1960                check_reserved_names,
1961            } => {
1962                if check_reserved_names && is_reserved_name(&new_name) {
1963                    return Err(AdapterError::Catalog(Error::new(
1964                        ErrorKind::ReservedSchemaName(new_name),
1965                    )));
1966                }
1967
1968                let conn_id = session
1969                    .map(|session| session.conn_id())
1970                    .unwrap_or(&SYSTEM_CONN_ID);
1971
1972                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1973                let cur_name = schema.name().schema.clone();
1974
1975                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1976                    return Err(AdapterError::Catalog(Error::new(
1977                        ErrorKind::AmbientSchemaRename(cur_name),
1978                    )));
1979                };
1980                let database = state.get_database(&database_id);
1981                let database_name = &database.name;
1982
1983                let mut updates = Vec::new();
1984                let mut items_to_update = BTreeMap::new();
1985
1986                let mut update_item = |id| {
1987                    if items_to_update.contains_key(id) {
1988                        return Ok(());
1989                    }
1990
1991                    let entry = state.get_entry(id);
1992
1993                    // Update our item.
1994                    let mut new_entry = entry.clone();
1995                    new_entry.item = entry
1996                        .item
1997                        .rename_schema_refs(database_name, &cur_name, &new_name)
1998                        .map_err(|(s, _i)| {
1999                            Error::new(ErrorKind::from(AmbiguousRename {
2000                                depender: state
2001                                    .resolve_full_name(entry.name(), entry.conn_id())
2002                                    .to_string(),
2003                                dependee: format!("{database_name}.{cur_name}"),
2004                                message: format!("ambiguous reference to schema named {s}"),
2005                            }))
2006                        })?;
2007
2008                    // Queue updates for Catalog storage and Builtin Tables.
2009                    if !new_entry.item().is_temporary() {
2010                        items_to_update.insert(*id, new_entry.into());
2011                    } else {
2012                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2013                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2014                    }
2015                    updates.push(id);
2016
2017                    Ok::<_, AdapterError>(())
2018                };
2019
2020                // Update all of the items in the schema.
2021                for (_name, item_id) in &schema.items {
2022                    // Update the item itself.
2023                    update_item(item_id)?;
2024
2025                    // Update everything that depends on this item.
2026                    for id in state.get_entry(item_id).referenced_by() {
2027                        update_item(id)?;
2028                    }
2029                }
2030                // Note: When updating the transaction it's very important that we update the
2031                // items as a whole group, otherwise we exhibit quadratic behavior.
2032                tx.update_items(items_to_update)?;
2033
2034                // Renaming temporary schemas is not supported.
2035                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2036                    let schema_name = schema.name().schema.clone();
2037                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2038                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2039                    )));
2040                };
2041
2042                // Add an entry to the audit log.
2043                let database_name = database_spec
2044                    .id()
2045                    .map(|id| state.get_database(&id).name.clone());
2046                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2047                    id: schema_id.to_string(),
2048                    old_name: schema.name().schema.clone(),
2049                    new_name: new_name.clone(),
2050                    database_name,
2051                });
2052                CatalogState::add_to_audit_log(
2053                    &state.system_configuration,
2054                    oracle_write_ts,
2055                    session,
2056                    tx,
2057                    audit_events,
2058                    EventType::Alter,
2059                    mz_audit_log::ObjectType::Schema,
2060                    details,
2061                )?;
2062
2063                // Update the schema itself.
2064                let mut new_schema = schema.clone();
2065                new_schema.name.schema.clone_from(&new_name);
2066                tx.update_schema(schema_id, new_schema.into())?;
2067
2068                for id in updates {
2069                    Self::log_update(state, id);
2070                }
2071            }
2072            Op::UpdateOwner { id, new_owner } => {
2073                let conn_id = session
2074                    .map(|session| session.conn_id())
2075                    .unwrap_or(&SYSTEM_CONN_ID);
2076                let old_owner = state
2077                    .get_owner_id(&id, conn_id)
2078                    .expect("cannot update the owner of an object without an owner");
2079                match &id {
2080                    ObjectId::Cluster(id) => {
2081                        let mut cluster = state.get_cluster(*id).clone();
2082                        if id.is_system() {
2083                            return Err(AdapterError::Catalog(Error::new(
2084                                ErrorKind::ReadOnlyCluster(cluster.name),
2085                            )));
2086                        }
2087                        Self::update_privilege_owners(
2088                            &mut cluster.privileges,
2089                            cluster.owner_id,
2090                            new_owner,
2091                        );
2092                        cluster.owner_id = new_owner;
2093                        tx.update_cluster(*id, cluster.into())?;
2094                    }
2095                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2096                        let cluster = state.get_cluster(*cluster_id);
2097                        let mut replica = cluster
2098                            .replica(*replica_id)
2099                            .expect("catalog out of sync")
2100                            .clone();
2101                        if replica_id.is_system() {
2102                            return Err(AdapterError::Catalog(Error::new(
2103                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2104                            )));
2105                        }
2106                        replica.owner_id = new_owner;
2107                        tx.update_cluster_replica(*replica_id, replica.into())?;
2108                    }
2109                    ObjectId::Database(id) => {
2110                        let mut database = state.get_database(id).clone();
2111                        if id.is_system() {
2112                            return Err(AdapterError::Catalog(Error::new(
2113                                ErrorKind::ReadOnlyDatabase(database.name),
2114                            )));
2115                        }
2116                        Self::update_privilege_owners(
2117                            &mut database.privileges,
2118                            database.owner_id,
2119                            new_owner,
2120                        );
2121                        database.owner_id = new_owner;
2122                        tx.update_database(*id, database.clone().into())?;
2123                    }
2124                    ObjectId::Schema((database_spec, schema_spec)) => {
2125                        let schema_id: SchemaId = schema_spec.clone().into();
2126                        let mut schema = state
2127                            .get_schema(database_spec, schema_spec, conn_id)
2128                            .clone();
2129                        if schema_id.is_system() {
2130                            let name = schema.name();
2131                            let full_name = state.resolve_full_schema_name(name);
2132                            return Err(AdapterError::Catalog(Error::new(
2133                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2134                            )));
2135                        }
2136                        Self::update_privilege_owners(
2137                            &mut schema.privileges,
2138                            schema.owner_id,
2139                            new_owner,
2140                        );
2141                        schema.owner_id = new_owner;
2142                        tx.update_schema(schema_id, schema.into())?;
2143                    }
2144                    ObjectId::Item(id) => {
2145                        let entry = state.get_entry(id);
2146                        let mut new_entry = entry.clone();
2147                        if id.is_system() {
2148                            let full_name = state.resolve_full_name(
2149                                new_entry.name(),
2150                                session.map(|session| session.conn_id()),
2151                            );
2152                            return Err(AdapterError::Catalog(Error::new(
2153                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2154                            )));
2155                        }
2156                        Self::update_privilege_owners(
2157                            &mut new_entry.privileges,
2158                            new_entry.owner_id,
2159                            new_owner,
2160                        );
2161                        new_entry.owner_id = new_owner;
2162                        if !new_entry.item().is_temporary() {
2163                            tx.update_item(*id, new_entry.into())?;
2164                        } else {
2165                            temporary_item_updates
2166                                .push((entry.clone().into(), StateDiff::Retraction));
2167                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2168                        }
2169                    }
2170                    ObjectId::NetworkPolicy(id) => {
2171                        let mut policy = state.get_network_policy(id).clone();
2172                        if id.is_system() {
2173                            return Err(AdapterError::Catalog(Error::new(
2174                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2175                            )));
2176                        }
2177                        Self::update_privilege_owners(
2178                            &mut policy.privileges,
2179                            policy.owner_id,
2180                            new_owner,
2181                        );
2182                        policy.owner_id = new_owner;
2183                        tx.update_network_policy(*id, policy.into())?;
2184                    }
2185                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2186                }
2187                let object_type = state.get_object_type(&id);
2188                CatalogState::add_to_audit_log(
2189                    &state.system_configuration,
2190                    oracle_write_ts,
2191                    session,
2192                    tx,
2193                    audit_events,
2194                    EventType::Alter,
2195                    object_type_to_audit_object_type(object_type),
2196                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2197                        object_id: id.to_string(),
2198                        old_owner_id: old_owner.to_string(),
2199                        new_owner_id: new_owner.to_string(),
2200                    }),
2201                )?;
2202            }
2203            Op::UpdateClusterConfig { id, name, config } => {
2204                let mut cluster = state.get_cluster(id).clone();
2205                cluster.config = config;
2206                tx.update_cluster(id, cluster.into())?;
2207                info!("update cluster {}", name);
2208
2209                CatalogState::add_to_audit_log(
2210                    &state.system_configuration,
2211                    oracle_write_ts,
2212                    session,
2213                    tx,
2214                    audit_events,
2215                    EventType::Alter,
2216                    ObjectType::Cluster,
2217                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2218                        id: id.to_string(),
2219                        name,
2220                    }),
2221                )?;
2222            }
2223            Op::UpdateClusterReplicaConfig {
2224                replica_id,
2225                cluster_id,
2226                config,
2227            } => {
2228                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2229                info!("update replica {}", replica.name);
2230                tx.update_cluster_replica(
2231                    replica_id,
2232                    mz_catalog::durable::ClusterReplica {
2233                        cluster_id,
2234                        replica_id,
2235                        name: replica.name.clone(),
2236                        config: config.clone().into(),
2237                        owner_id: replica.owner_id,
2238                    },
2239                )?;
2240            }
2241            Op::UpdateItem { id, name, to_item } => {
2242                let mut entry = state.get_entry(&id).clone();
2243                entry.name = name.clone();
2244                entry.item = to_item.clone();
2245                tx.update_item(id, entry.into())?;
2246
2247                if Self::should_audit_log_item(&to_item) {
2248                    let mut full_name = Self::full_name_detail(
2249                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2250                    );
2251                    full_name.item = name.item;
2252
2253                    CatalogState::add_to_audit_log(
2254                        &state.system_configuration,
2255                        oracle_write_ts,
2256                        session,
2257                        tx,
2258                        audit_events,
2259                        EventType::Alter,
2260                        catalog_type_to_audit_object_type(to_item.typ()),
2261                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2262                            id: id.to_string(),
2263                            name: full_name,
2264                        }),
2265                    )?;
2266                }
2267
2268                Self::log_update(state, &id);
2269            }
2270            Op::UpdateSystemConfiguration { name, value } => {
2271                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2272                tx.upsert_system_config(&name, parsed_value.clone())?;
2273                // This mirrors the `enable_0dt_deployment` "system var" into the catalog
2274                // storage "config" collection so that we can toggle the flag with
2275                // Launch Darkly, but use it in boot before Launch Darkly is available.
2276                if name == ENABLE_0DT_DEPLOYMENT.name() {
2277                    let enable_0dt_deployment =
2278                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2279                    tx.set_enable_0dt_deployment(enable_0dt_deployment)?;
2280                } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2281                    let with_0dt_deployment_max_wait =
2282                        Duration::parse(VarInput::Flat(&parsed_value))
2283                            .expect("parsing succeeded above");
2284                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2285                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2286                    let with_0dt_deployment_ddl_check_interval =
2287                        Duration::parse(VarInput::Flat(&parsed_value))
2288                            .expect("parsing succeeded above");
2289                    tx.set_0dt_deployment_ddl_check_interval(
2290                        with_0dt_deployment_ddl_check_interval,
2291                    )?;
2292                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2293                    let panic_after_timeout =
2294                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2295                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2296                }
2297
2298                CatalogState::add_to_audit_log(
2299                    &state.system_configuration,
2300                    oracle_write_ts,
2301                    session,
2302                    tx,
2303                    audit_events,
2304                    EventType::Alter,
2305                    ObjectType::System,
2306                    EventDetails::SetV1(mz_audit_log::SetV1 {
2307                        name,
2308                        value: Some(value.borrow().to_vec().join(", ")),
2309                    }),
2310                )?;
2311            }
2312            Op::ResetSystemConfiguration { name } => {
2313                tx.remove_system_config(&name);
2314                // This mirrors the `enable_0dt_deployment` "system var" into the catalog
2315                // storage "config" collection so that we can toggle the flag with
2316                // Launch Darkly, but use it in boot before Launch Darkly is available.
2317                if name == ENABLE_0DT_DEPLOYMENT.name() {
2318                    tx.reset_enable_0dt_deployment()?;
2319                } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2320                    tx.reset_0dt_deployment_max_wait()?;
2321                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2322                    tx.reset_0dt_deployment_ddl_check_interval()?;
2323                }
2324
2325                CatalogState::add_to_audit_log(
2326                    &state.system_configuration,
2327                    oracle_write_ts,
2328                    session,
2329                    tx,
2330                    audit_events,
2331                    EventType::Alter,
2332                    ObjectType::System,
2333                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2334                )?;
2335            }
2336            Op::ResetAllSystemConfiguration => {
2337                tx.clear_system_configs();
2338                tx.reset_enable_0dt_deployment()?;
2339                tx.reset_0dt_deployment_max_wait()?;
2340                tx.reset_0dt_deployment_ddl_check_interval()?;
2341
2342                CatalogState::add_to_audit_log(
2343                    &state.system_configuration,
2344                    oracle_write_ts,
2345                    session,
2346                    tx,
2347                    audit_events,
2348                    EventType::Alter,
2349                    ObjectType::System,
2350                    EventDetails::ResetAllV1,
2351                )?;
2352            }
2353            Op::WeirdStorageUsageUpdates {
2354                object_id,
2355                size_bytes,
2356                collection_timestamp,
2357            } => {
2358                let id = tx.allocate_storage_usage_ids()?;
2359                let metric =
2360                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2361                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2362                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2363                weird_builtin_table_update = Some(builtin_table_update);
2364            }
2365        };
2366        Ok((weird_builtin_table_update, temporary_item_updates))
2367    }
2368
2369    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2370        let entry = state.get_entry(id);
2371        info!(
2372            "update {} {} ({})",
2373            entry.item_type(),
2374            state.resolve_full_name(entry.name(), entry.conn_id()),
2375            id
2376        );
2377    }
2378
2379    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2380    /// implementation:
2381    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2382    fn update_privilege_owners(
2383        privileges: &mut PrivilegeMap,
2384        old_owner: RoleId,
2385        new_owner: RoleId,
2386    ) {
2387        // TODO(jkosh44) Would be nice not to clone every privilege.
2388        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2389
2390        let mut new_present = false;
2391        for privilege in flat_privileges.iter_mut() {
2392            // Old owner's granted privilege are updated to be granted by the new
2393            // owner.
2394            if privilege.grantor == old_owner {
2395                privilege.grantor = new_owner;
2396            } else if privilege.grantor == new_owner {
2397                new_present = true;
2398            }
2399            // Old owner's privileges is given to the new owner.
2400            if privilege.grantee == old_owner {
2401                privilege.grantee = new_owner;
2402            } else if privilege.grantee == new_owner {
2403                new_present = true;
2404            }
2405        }
2406
2407        // If the old privilege list contained references to the new owner, we may
2408        // have created duplicate entries. Here we try and consolidate them. This
2409        // is inspired by PostgreSQL's algorithm but not identical.
2410        if new_present {
2411            // Group privileges by (grantee, grantor).
2412            let privilege_map: BTreeMap<_, Vec<_>> =
2413                flat_privileges
2414                    .into_iter()
2415                    .fold(BTreeMap::new(), |mut accum, privilege| {
2416                        accum
2417                            .entry((privilege.grantee, privilege.grantor))
2418                            .or_default()
2419                            .push(privilege);
2420                        accum
2421                    });
2422
2423            // Consolidate and update all privileges.
2424            flat_privileges = privilege_map
2425                .into_iter()
2426                .map(|((grantee, grantor), values)|
2427                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2428                    values.into_iter().fold(
2429                        MzAclItem::empty(grantee, grantor),
2430                        |mut accum, mz_aclitem| {
2431                            accum.acl_mode =
2432                                accum.acl_mode.union(mz_aclitem.acl_mode);
2433                            accum
2434                        },
2435                    ))
2436                .collect();
2437        }
2438
2439        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2440    }
2441}
2442
2443/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2444///
2445/// Note: Previously we used to omit a single `Op::DropObject` for every object
2446/// we needed to drop. But removing a batch of objects from a durable Catalog
2447/// Transaction is O(n) where `n` is the number of objects that exist in the
2448/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2449/// `DROP ... CASCADE` statement.
2450#[derive(Debug, Default)]
2451pub(crate) struct ObjectsToDrop {
2452    pub comments: BTreeSet<CommentObjectId>,
2453    pub databases: BTreeSet<DatabaseId>,
2454    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2455    pub clusters: BTreeSet<ClusterId>,
2456    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2457    pub roles: BTreeSet<RoleId>,
2458    pub items: Vec<CatalogItemId>,
2459    pub network_policies: BTreeSet<NetworkPolicyId>,
2460}
2461
2462impl ObjectsToDrop {
2463    pub fn generate(
2464        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2465        state: &CatalogState,
2466        session: Option<&ConnMeta>,
2467    ) -> Result<Self, AdapterError> {
2468        let mut delta = ObjectsToDrop::default();
2469
2470        for drop_object_info in drop_object_infos {
2471            delta.add_item(drop_object_info, state, session)?;
2472        }
2473
2474        Ok(delta)
2475    }
2476
2477    fn add_item(
2478        &mut self,
2479        drop_object_info: DropObjectInfo,
2480        state: &CatalogState,
2481        session: Option<&ConnMeta>,
2482    ) -> Result<(), AdapterError> {
2483        self.comments
2484            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2485
2486        match drop_object_info {
2487            DropObjectInfo::Database(database_id) => {
2488                let database = &state.database_by_id[&database_id];
2489                if database_id.is_system() {
2490                    return Err(AdapterError::Catalog(Error::new(
2491                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2492                    )));
2493                }
2494
2495                self.databases.insert(database_id);
2496            }
2497            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2498                let schema = state.get_schema(
2499                    &database_spec,
2500                    &schema_spec,
2501                    session
2502                        .map(|session| session.conn_id())
2503                        .unwrap_or(&SYSTEM_CONN_ID),
2504                );
2505                let schema_id: SchemaId = schema_spec.into();
2506                if schema_id.is_system() {
2507                    let name = schema.name();
2508                    let full_name = state.resolve_full_schema_name(name);
2509                    return Err(AdapterError::Catalog(Error::new(
2510                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2511                    )));
2512                }
2513
2514                self.schemas.insert(schema_spec, database_spec);
2515            }
2516            DropObjectInfo::Role(role_id) => {
2517                let name = state.get_role(&role_id).name().to_string();
2518                if role_id.is_system() || role_id.is_predefined() {
2519                    return Err(AdapterError::Catalog(Error::new(
2520                        ErrorKind::ReservedRoleName(name.clone()),
2521                    )));
2522                }
2523                state.ensure_not_reserved_role(&role_id)?;
2524
2525                self.roles.insert(role_id);
2526            }
2527            DropObjectInfo::Cluster(cluster_id) => {
2528                let cluster = state.get_cluster(cluster_id);
2529                let name = &cluster.name;
2530                if cluster_id.is_system() {
2531                    return Err(AdapterError::Catalog(Error::new(
2532                        ErrorKind::ReadOnlyCluster(name.clone()),
2533                    )));
2534                }
2535
2536                self.clusters.insert(cluster_id);
2537            }
2538            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2539                let cluster = state.get_cluster(cluster_id);
2540                let replica = cluster.replica(replica_id).expect("Must exist");
2541
2542                self.replicas
2543                    .insert(replica.replica_id, (cluster.id, reason));
2544            }
2545            DropObjectInfo::Item(item_id) => {
2546                let entry = state.get_entry(&item_id);
2547                if item_id.is_system() {
2548                    let name = entry.name();
2549                    let full_name =
2550                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2551                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2552                        full_name.to_string(),
2553                    ))));
2554                }
2555
2556                self.items.push(item_id);
2557            }
2558            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2559                let policy = state.get_network_policy(&network_policy_id);
2560                let name = &policy.name;
2561                if network_policy_id.is_system() {
2562                    return Err(AdapterError::Catalog(Error::new(
2563                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2564                    )));
2565                }
2566
2567                self.network_policies.insert(network_policy_id);
2568            }
2569        }
2570
2571        Ok(())
2572    }
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2578    use mz_repr::role_id::RoleId;
2579
2580    use crate::catalog::Catalog;
2581
2582    #[mz_ore::test]
2583    fn test_update_privilege_owners() {
2584        let old_owner = RoleId::User(1);
2585        let new_owner = RoleId::User(2);
2586        let other_role = RoleId::User(3);
2587
2588        // older owner exists as grantor.
2589        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2590            MzAclItem {
2591                grantee: other_role,
2592                grantor: old_owner,
2593                acl_mode: AclMode::UPDATE,
2594            },
2595            MzAclItem {
2596                grantee: other_role,
2597                grantor: new_owner,
2598                acl_mode: AclMode::SELECT,
2599            },
2600        ]);
2601        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2602        assert_eq!(1, privileges.all_values().count());
2603        assert_eq!(
2604            vec![MzAclItem {
2605                grantee: other_role,
2606                grantor: new_owner,
2607                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2608            }],
2609            privileges.all_values_owned().collect::<Vec<_>>()
2610        );
2611
2612        // older owner exists as grantee.
2613        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2614            MzAclItem {
2615                grantee: old_owner,
2616                grantor: other_role,
2617                acl_mode: AclMode::UPDATE,
2618            },
2619            MzAclItem {
2620                grantee: new_owner,
2621                grantor: other_role,
2622                acl_mode: AclMode::SELECT,
2623            },
2624        ]);
2625        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2626        assert_eq!(1, privileges.all_values().count());
2627        assert_eq!(
2628            vec![MzAclItem {
2629                grantee: new_owner,
2630                grantor: other_role,
2631                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2632            }],
2633            privileges.all_values_owned().collect::<Vec<_>>()
2634        );
2635
2636        // older owner exists as grantee and grantor.
2637        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2638            MzAclItem {
2639                grantee: old_owner,
2640                grantor: old_owner,
2641                acl_mode: AclMode::UPDATE,
2642            },
2643            MzAclItem {
2644                grantee: new_owner,
2645                grantor: new_owner,
2646                acl_mode: AclMode::SELECT,
2647            },
2648        ]);
2649        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2650        assert_eq!(1, privileges.all_values().count());
2651        assert_eq!(
2652            vec![MzAclItem {
2653                grantee: new_owner,
2654                grantor: new_owner,
2655                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2656            }],
2657            privileges.all_values_owned().collect::<Vec<_>>()
2658        );
2659    }
2660}